This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 156784b5 Develop cpp (#762)
156784b5 is described below

commit 156784b540328076c4c43fee242306ac94977dbb
Author: Zhanhui Li <lizhan...@gmail.com>
AuthorDate: Wed Jun 26 19:07:14 2024 +0800

    Develop cpp (#762)
    
    * feat: add tagged-release github action pipeline
    
    Signed-off-by: Zhanhui Li <lizhan...@gmail.com>
    
    * Fix C++ SDK core dump issue (#2)
    
    * fix: sync namespace from server Settings
    
    * feat: use opentelemetry for tracing/metrics/logging
    
    * Remove broken links and add targets to generate compile_commands.json
    
    Signed-off-by: lizhanhui <lizhan...@gmail.com>
    
    * fix: timer task may invoke a call to a destructing stream
    
    Signed-off-by: Zhanhui Li <lizhan...@gmail.com>
    
    * fix: update document as we have changed the way to generate 
compile_commands.json
    
    * fix: static_cast StreamState to std::uint8_t as enum class by default is 
not formattable
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    ---------
    
    Signed-off-by: lizhanhui <lizhan...@gmail.com>
    Signed-off-by: Zhanhui Li <lizhan...@gmail.com>
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: fix stream state transition with gRPC reactor
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: revamp TelemetryBidiRecator
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * feat: explicitly control exported symbols
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: release write hold when OnReadDone with ok=false and there is no 
inflight write
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * feat: revamp TelemetryBidiReactor states and their transition graph
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: example publish message QPS stats lambda
    
    * Fifo opt (#732)
    
    * Prepare to optimize FIFO publishing
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: SendReceipt now contains std::unique_ptr<Message> being sent
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: add doc explaining why we taking ownership of the message being sent
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * feat: implement FifoProducerPartition
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * feat: implement FifoProducerImpl
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * feat: implement builder for FifoProducer
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: prepare to debug
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: log sending sending stages
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    ---------
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: spell errors
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: markdown code blocks should be fenced by blank lines
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: copyright header
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: copyright header
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: include statements
    
    Signed-off-by: Zhanhui Li <lizhan...@gmail.com>
    
    * feat: build example_fifo_producer in BUILD.bazel
    
    Signed-off-by: Zhanhui Li <lizhan...@gmail.com>
    
    * fix: #713 correct misuse of absl::make_optional
    
    Signed-off-by: Zhanhui Li <lizhan...@gmail.com>
    
    ---------
    
    Signed-off-by: Zhanhui Li <lizhan...@gmail.com>
    Signed-off-by: lizhanhui <lizhan...@gmail.com>
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
---
 .github/workflows/cpp_tagged_release.yaml          |  57 ++++
 .licenserc.yaml                                    |   1 +
 cpp/CMakeLists.txt                                 |   4 +-
 cpp/README.md                                      |  23 +-
 cpp/WORKSPACE                                      |  26 +-
 cpp/bazel/rocketmq_deps.bzl                        |  25 +-
 cpp/examples/BUILD.bazel                           |  11 +
 cpp/examples/CMakeLists.txt                        |   1 +
 ...oducerWithAsync.cpp => ExampleFifoProducer.cpp} |  71 ++--
 cpp/examples/ExampleProducer.cpp                   |   7 +-
 cpp/examples/ExampleProducerWithAsync.cpp          |   8 +-
 cpp/examples/ExampleProducerWithFifoMessage.cpp    |  10 +-
 cpp/examples/ExampleProducerWithTimedMessage.cpp   |   7 +-
 .../ExampleProducerWithTransactionalMessage.cpp    |   7 +-
 cpp/examples/ExamplePushConsumer.cpp               |   4 +-
 cpp/examples/ExampleSimpleConsumer.cpp             |   4 +-
 cpp/include/rocketmq/Configuration.h               |   6 +-
 cpp/include/rocketmq/FifoProducer.h                |  68 ++++
 cpp/include/rocketmq/Producer.h                    |   5 +-
 cpp/include/rocketmq/SendReceipt.h                 |   7 +-
 cpp/source/CMakeLists.txt                          |   3 +
 cpp/source/base/Configuration.cpp                  |   4 +-
 cpp/source/client/ClientManagerImpl.cpp            | 152 +++++----
 cpp/source/client/SessionImpl.cpp                  |   6 +-
 cpp/source/client/TelemetryBidiReactor.cpp         | 372 ++++++++++++++++-----
 cpp/source/client/include/ClientManager.h          |  10 +-
 cpp/source/client/include/ClientManagerImpl.h      |  14 +-
 .../client/include/SendResult.h}                   |  14 +-
 .../client/include/SendResultCallback.h}           |  14 +-
 cpp/source/client/include/TelemetryBidiReactor.h   | 110 ++++--
 cpp/source/exports.map                             |   6 +
 .../rocketmq/FifoContext.cpp}                      |  21 +-
 cpp/source/rocketmq/FifoProducer.cpp               |  73 ++++
 .../rocketmq/FifoProducerImpl.cpp}                 |  22 +-
 cpp/source/rocketmq/FifoProducerPartition.cpp      | 107 ++++++
 cpp/source/rocketmq/Producer.cpp                   |   4 -
 cpp/source/rocketmq/ProducerImpl.cpp               |  51 +--
 cpp/source/rocketmq/SendContext.cpp                |  53 +--
 cpp/source/rocketmq/include/ClientImpl.h           |   4 +-
 .../rocketmq/include/FifoContext.h}                |  17 +-
 cpp/source/rocketmq/include/FifoProducerImpl.h     |  53 +++
 .../rocketmq/include/FifoProducerPartition.h       |  55 +++
 cpp/source/rocketmq/include/ProducerImpl.h         |  28 +-
 cpp/source/rocketmq/include/SendContext.h          |  10 +-
 cpp/source/rocketmq/include/SimpleConsumerImpl.h   |   2 +-
 cpp/source/rocketmq/tests/BUILD.bazel              |   8 +
 .../rocketmq/tests/OptionalTest.cpp}               |  21 +-
 cpp/source/stats/MetricBidiReactor.cpp             |  40 ++-
 cpp/source/stats/include/MetricBidiReactor.h       |  24 +-
 ...trouble_shooting.sh => gen_compile_commands.sh} |   7 +-
 cpp/tools/trouble_shooting.sh                      |   0
 51 files changed, 1221 insertions(+), 436 deletions(-)

diff --git a/.github/workflows/cpp_tagged_release.yaml 
b/.github/workflows/cpp_tagged_release.yaml
new file mode 100644
index 00000000..416676c6
--- /dev/null
+++ b/.github/workflows/cpp_tagged_release.yaml
@@ -0,0 +1,57 @@
+---
+name: "cpp-tagged-release"
+
+on:
+  push:
+    tags:
+      - "cpp-*"
+
+jobs:
+  tagged-release:
+    name: "C++ Tagged Release"
+    runs-on: "ubuntu-latest"
+
+    steps:
+      - uses: actions/checkout@v4
+      - uses: actions/checkout@v4
+        with:
+          repository: grpc/grpc
+          # The branch, tag or SHA to checkout. When checking out the 
repository that
+          # triggered a workflow, this defaults to the reference or SHA for 
that event.
+          # Otherwise, uses the default branch.
+          ref: 'v1.46.3'
+          # Relative path under $GITHUB_WORKSPACE to place the repository
+          path: cpp/repo/grpc
+          submodules: true
+      - name: "Install Dependencies"
+        run: |
+            sudo apt-get install -y build-essential autoconf libtool 
pkg-config cmake git libprotobuf-dev libssl-dev zlib1g-dev libgflags-dev
+      - name: "Build gRPC"
+        working-directory: ./cpp/repo/grpc  
+        run: |
+          mkdir _build && cd _build
+          cmake -DCMAKE_INSTALL_PREFIX=$HOME/grpc -DgRPC_SSL_PROVIDER=package 
-DgRPC_ZLIB_PROVIDER=package -DgRPC_PROTOBUF_PACKAGE_TYPE=CONFIG 
-DgRPC_ZLIB_PROVIDER=package ..
+          make
+          make install
+      - name: "Build Libraries"
+        working-directory: ./cpp
+        run: |
+          mkdir _build && cd _build
+          cmake .. 
+          make
+      - name: "Package"
+        working-directory: ./cpp
+        run: |
+          mkdir -p dist/lib
+          mkdir -p dist/include
+          cp -r include/rocketmq dist/include/
+          cp _build/librocketmq.so dist/lib/
+          cp _build/librocketmq.a  dist/lib/
+          tar -czvf dist.tar.gz dist
+      - uses: "marvinpinto/action-automatic-releases@latest"
+        with:
+          repo_token: "${{ secrets.GITHUB_TOKEN }}"
+          prerelease: false
+          automatic_release_tag: cpp
+          files: |
+            cpp/dist.tar.gz
\ No newline at end of file
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 4edcd18f..cb5c8747 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -48,6 +48,7 @@ header:
     - 'cpp/.gitignore'
     - 'cpp/third_party'
     - 'cpp/cmake'
+    - 'cpp/source/exports.map'
     - 'php/grpc/**/*.php'
     - 'php/composer.json'
     - 'rust/.cargo/Cargo.lock.min'
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 374466e6..42f7cd70 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -1,4 +1,4 @@
-cmake_minimum_required(VERSION 3.19)
+cmake_minimum_required(VERSION 3.16)
 project(rocketmq)
 set(CMAKE_CXX_STANDARD 11)
 set(CMAKE_POSITION_INDEPENDENT_CODE ON)
@@ -31,4 +31,4 @@ if (BUILD_EXAMPLES)
     find_package(gflags REQUIRED)
     find_package(ZLIB REQUIRED)
     add_subdirectory(examples)
-endif ()
\ No newline at end of file
+endif ()
diff --git a/cpp/README.md b/cpp/README.md
index 117a2426..a6d983fb 100644
--- a/cpp/README.md
+++ b/cpp/README.md
@@ -217,19 +217,13 @@ if "com_google_googletest" not in native.existing_rules():
 1. VSCode + Clangd
 
    [Clangd](https://clangd.llvm.org/) is a really nice code completion tool. 
Clangd requires compile_commands.json to work properly.
-   To generate the file, we need clone another repository along with the 
current one.
+   To generate the file, run the following command:
 
    ```sh
-   git clone g...@github.com:grailbio/bazel-compilation-database.git
+    ./tools/gen_compile_commands.sh
    ```
 
-   From current repository root,
-
-   ```sh
-   ../bazel-compilation-database/generate.sh
-   ```
-
-   Once the script completes, you should have compile_commands.json file in 
the repository root directory.
+   Once the script completes, you should have compile_commands.json file in 
the workspace directory, aka, ${repository}/cpp.
 
    LLVM project has an extension for 
[clangd](https://marketplace.visualstudio.com/items?itemName=llvm-vs-code-extensions.vscode-clangd).
 Please install it from the extension market.
 
@@ -239,8 +233,15 @@ if "com_google_googletest" not in native.existing_rules():
       "C_Cpp.intelliSenseEngine": "Disabled",
       "C_Cpp.autocomplete": "Disabled", // So you don't get autocomplete from 
both extensions.
       "C_Cpp.errorSquiggles": "Disabled", // So you don't get error squiggles 
from both extensions (clangd's seem to be more reliable anyway).
-      "clangd.path": "/Users/lizhanhui/usr/clangd_12.0.0/bin/clangd",
-      "clangd.arguments": ["-log=verbose", "-pretty", "--background-index"],
+      "clangd.path": "/usr/bin/clangd",
+      "clangd.arguments": [
+         "-log=verbose",
+         "-pretty",
+         "--background-index",
+         "--header-insertion=never",
+         "--compile-commands-dir=${workspaceFolder}/",
+         "--query-driver=**"
+      ],
       "clangd.onConfigChanged": "restart",
    ```
 
diff --git a/cpp/WORKSPACE b/cpp/WORKSPACE
index d09dd445..3c3d6476 100644
--- a/cpp/WORKSPACE
+++ b/cpp/WORKSPACE
@@ -27,4 +27,28 @@ http_archive(
 load("@io_buildbuddy_buildbuddy_toolchain//:deps.bzl", "buildbuddy_deps")
 buildbuddy_deps()
 load("@io_buildbuddy_buildbuddy_toolchain//:rules.bzl", "buildbuddy")
-buildbuddy(name = "buildbuddy_toolchain")
\ No newline at end of file
+buildbuddy(name = "buildbuddy_toolchain")
+
+# Generate compile_commands.json
+load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+
+
+# Hedron's Compile Commands Extractor for Bazel
+# https://github.com/hedronvision/bazel-compile-commands-extractor
+http_archive(
+    name = "hedron_compile_commands",
+
+    # Replace the commit hash (0e990032f3c5a866e72615cf67e5ce22186dcb97) in 
both places (below) with the latest 
(https://github.com/hedronvision/bazel-compile-commands-extractor/commits/main),
 rather than using the stale one here.
+    # Even better, set up Renovate and let it do the work for you (see 
"Suggestion: Updates" in the README).
+    url = 
"https://github.com/hedronvision/bazel-compile-commands-extractor/archive/204aa593e002cbd177d30f11f54cff3559110bb9.tar.gz";,
+    strip_prefix = 
"bazel-compile-commands-extractor-204aa593e002cbd177d30f11f54cff3559110bb9",
+    # When you first run this tool, it'll recommend a sha256 hash to put here 
with a message like: "DEBUG: Rule 'hedron_compile_commands' indicated that a 
canonical reproducible form can be obtained by modifying arguments sha256 = ..."
+)
+load("@hedron_compile_commands//:workspace_setup.bzl", 
"hedron_compile_commands_setup")
+hedron_compile_commands_setup()
+load("@hedron_compile_commands//:workspace_setup_transitive.bzl", 
"hedron_compile_commands_setup_transitive")
+hedron_compile_commands_setup_transitive()
+load("@hedron_compile_commands//:workspace_setup_transitive_transitive.bzl", 
"hedron_compile_commands_setup_transitive_transitive")
+hedron_compile_commands_setup_transitive_transitive()
+load("@hedron_compile_commands//:workspace_setup_transitive_transitive_transitive.bzl",
 "hedron_compile_commands_setup_transitive_transitive_transitive")
+hedron_compile_commands_setup_transitive_transitive_transitive()
\ No newline at end of file
diff --git a/cpp/bazel/rocketmq_deps.bzl b/cpp/bazel/rocketmq_deps.bzl
index eae31a6f..684e55eb 100644
--- a/cpp/bazel/rocketmq_deps.bzl
+++ b/cpp/bazel/rocketmq_deps.bzl
@@ -16,7 +16,6 @@ def rocketmq_deps():
         sha256 = 
"b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5",
         strip_prefix = "googletest-release-1.11.0",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz";,
             
"https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz";,
         ],
     )
@@ -27,7 +26,6 @@ def rocketmq_deps():
         strip_prefix = "filesystem-1.5.0",
         sha256 = 
"eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz";,
             "https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz";,
         ],
         build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD",
@@ -39,7 +37,6 @@ def rocketmq_deps():
         strip_prefix = "spdlog-1.9.2",
         sha256 = 
"6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz";,
             "https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz";,
         ],
         build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD",
@@ -51,7 +48,6 @@ def rocketmq_deps():
         strip_prefix = "fmt-8.0.1",
         sha256 = 
"b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz";,
             "https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz";,
         ],
         build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD",
@@ -63,7 +59,6 @@ def rocketmq_deps():
         sha256 = 
"8b28fdd45bab62d15db232ec404248901842e5340299a57765e48abe8a80d930",
         strip_prefix = "protobuf-3.20.1",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/protobuf/protobuf-3.20.1.tar.gz";,
             
"https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.20.1.tar.gz";,
         ],
     )
@@ -74,7 +69,6 @@ def rocketmq_deps():
         sha256 = 
"507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731",
         strip_prefix = "rules_proto_grpc-4.1.1",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto_grpc/rules_proto_grpc-4.1.1.tar.gz";,
             
"https://github.com/rules-proto-grpc/rules_proto_grpc/archive/refs/tags/4.1.1.tar.gz";,
         ],
     )
@@ -84,7 +78,6 @@ def rocketmq_deps():
         name = "io_opencensus_cpp",
         sha256 = 
"317f2bfdaba469561c7e64b1a55282b87e677c109c9d8877097940e6d5cbca08",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/opencensus-cpp/opencensus-cpp-0.4.1.tar.gz";,
             
"https://github.com/lizhanhui/opencensus-cpp/archive/refs/tags/v0.4.1.tar.gz";,
         ],
         strip_prefix = "opencensus-cpp-0.4.1",
@@ -96,7 +89,6 @@ def rocketmq_deps():
         sha256 = 
"dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4",
         strip_prefix = "abseil-cpp-20211102.0",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/abseil/abseil-cpp-20211102.0.tar.gz";,
             
"https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz";,
         ],
     )
@@ -107,7 +99,6 @@ def rocketmq_deps():
         strip_prefix = "gflags-2.2.2",
         sha256 = 
"34af2f15cf7367513b352bdcd2493ab14ce43692d2dcd9dfc499492966c64dcf",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/gflags/gflags-2.2.2.tar.gz";,
             "https://github.com/gflags/gflags/archive/refs/tags/v2.2.2.tar.gz";,
         ],
     )
@@ -118,7 +109,6 @@ def rocketmq_deps():
         strip_prefix = "grpc-1.46.3",
         sha256 = 
"d6cbf22cb5007af71b61c6be316a79397469c58c82a942552a62e708bce60964",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/grpc/grpc-1.46.3.tar.gz";,
             "https://github.com/grpc/grpc/archive/refs/tags/v1.46.3.tar.gz";,
         ],
     )
@@ -130,7 +120,6 @@ def rocketmq_deps():
         build_file = "@org_apache_rocketmq//third_party:asio.BUILD",
         strip_prefix = "asio-1.18.2",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/asio/asio-1.18.2.tar.gz";,
             
"https://github.com/lizhanhui/asio/archive/refs/tags/v1.18.2.tar.gz";,
         ],
     )
@@ -140,7 +129,6 @@ def rocketmq_deps():
         name = "com_google_googleapis",
         sha256 = 
"e89f15d54b0ddab0cd41d18cb2299e5447db704e2b05ff141cb1769170671466",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googleapis/googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip";,
             
"https://github.com/googleapis/googleapis/archive/af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip";,
         ],
         strip_prefix = "googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95",
@@ -152,7 +140,6 @@ def rocketmq_deps():
         sha256 = 
"cdf6b84084aad8f10bf20b46b77cb48d83c319ebe6458a18e9d2cebf57807cdd",
         strip_prefix = "rules_python-0.8.1",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-python/rules_python-0.8.1.tar.gz";,
             
"https://github.com/bazelbuild/rules_python/archive/refs/tags/0.8.1.tar.gz";,
         ],
     )
@@ -161,7 +148,6 @@ def rocketmq_deps():
         http_archive,
         name = "rules_swift",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_swift/rules_swift-0.27.0.tar.gz";,
             
"https://github.com/bazelbuild/rules_swift/archive/refs/tags/0.27.0.tar.gz";,
         ],
         strip_prefix = "rules_swift-0.27.0",
@@ -172,7 +158,6 @@ def rocketmq_deps():
         name = "io_bazel_rules_go",
         sha256 = 
"685052b498b6ddfe562ca7a97736741d87916fe536623afb7da2824c0211c369",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-go/rules_go-v0.33.0.zip";,
             
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip";,
             
"https://github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip";,
         ],
@@ -184,7 +169,15 @@ def rocketmq_deps():
         sha256 = 
"e017528fd1c91c5a33f15493e3a398181a9e821a804eb7ff5acdd1d2d6c2b18d",
         strip_prefix = "rules_proto-4.0.0-3.20.0",
         urls = [
-            
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto/rules_proto-4.0.0-3.20.0.tar.gz";,
             
"https://github.com/bazelbuild/rules_proto/archive/refs/tags/4.0.0-3.20.0.tar.gz";,
         ],
     )
+
+    maybe(
+        http_archive,
+        name = "com_github_opentelemetry",
+        strip_prefix = "opentelemetry-cpp-1.14.2",
+        urls = [
+            
"https://github.com/open-telemetry/opentelemetry-cpp/archive/refs/tags/v1.14.2.tar.gz";
+        ]
+    )
diff --git a/cpp/examples/BUILD.bazel b/cpp/examples/BUILD.bazel
index 5113e150..771b0e04 100644
--- a/cpp/examples/BUILD.bazel
+++ b/cpp/examples/BUILD.bazel
@@ -91,4 +91,15 @@ cc_binary(
         "//source/rocketmq:rocketmq_library",
         "@com_github_gflags_gflags//:gflags",
     ],
+)
+
+cc_binary(
+    name = "example_fifo_producer",
+    srcs = [
+        "ExampleFifoProducer.cpp",
+    ],
+    deps = [
+        "//source/rocketmq:rocketmq_library",
+        "@com_github_gflags_gflags//:gflags",
+    ]
 )
\ No newline at end of file
diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt
index 8d6b0399..27304477 100644
--- a/cpp/examples/CMakeLists.txt
+++ b/cpp/examples/CMakeLists.txt
@@ -4,6 +4,7 @@ function(add_example name file)
 endfunction()
 
 add_example(example_producer ExampleProducer.cpp)
+add_example(example_fifo_producer ExampleFifoProducer.cpp)
 add_example(example_producer_with_async ExampleProducerWithAsync.cpp)
 add_example(example_producer_with_fifo_message 
ExampleProducerWithFifoMessage.cpp)
 add_example(example_producer_with_timed_message 
ExampleProducerWithTimedMessage.cpp)
diff --git a/cpp/examples/ExampleProducerWithAsync.cpp 
b/cpp/examples/ExampleFifoProducer.cpp
similarity index 73%
copy from cpp/examples/ExampleProducerWithAsync.cpp
copy to cpp/examples/ExampleFifoProducer.cpp
index 62ee7781..9d99be36 100644
--- a/cpp/examples/ExampleProducerWithAsync.cpp
+++ b/cpp/examples/ExampleFifoProducer.cpp
@@ -17,16 +17,19 @@
 #include <algorithm>
 #include <atomic>
 #include <condition_variable>
-#include <cstdint>
 #include <iostream>
-#include <mutex>
+#include <memory>
 #include <random>
 #include <string>
 #include <system_error>
 
 #include "gflags/gflags.h"
+#include "rocketmq/CredentialsProvider.h"
+#include "rocketmq/FifoProducer.h"
+#include "rocketmq/Logger.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/Producer.h"
+#include "rocketmq/SendReceipt.h"
 
 using namespace ROCKETMQ_NAMESPACE;
 
@@ -94,30 +97,33 @@ DEFINE_string(topic, "standard_topic_sample", "Topic to 
which messages are publi
 DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
 DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
-DEFINE_uint32(concurrency, 128, "Concurrency of async send");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
+DEFINE_uint32(concurrency, 16, "Concurrency of FIFO producer");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
 
   auto& logger = getLogger();
-  logger.setConsoleLevel(Level::Info);
-  logger.setLevel(Level::Info);
+  logger.setConsoleLevel(Level::Debug);
+  logger.setLevel(Level::Debug);
   logger.init();
 
+  // Access Key/Secret pair may be acquired from management console
   CredentialsProviderPtr credentials_provider;
   if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
     credentials_provider = 
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, 
FLAGS_access_secret);
   }
 
-  // In most case, you don't need to create too many producers, singletion 
pattern is recommended.
-  auto producer = Producer::newBuilder()
+  // In most case, you don't need to create too many producers, singleton 
pattern is recommended.
+  auto producer = FifoProducer::newBuilder()
                       .withConfiguration(Configuration::newBuilder()
                                              .withEndpoints(FLAGS_access_point)
                                              
.withCredentialsProvider(credentials_provider)
-                                             .withSsl(true)
+                                             .withSsl(FLAGS_tls)
                                              .build())
+                      .withConcurrency(FLAGS_concurrency)
                       .withTopics({FLAGS_topic})
                       .build();
 
@@ -127,8 +133,8 @@ int main(int argc, char* argv[]) {
   auto stats_lambda = [&] {
     while (!stopped.load(std::memory_order_relaxed)) {
       long cnt = count.load(std::memory_order_relaxed);
-      while (count.compare_exchange_weak(cnt, 0)) {
-        break;
+      while (!count.compare_exchange_weak(cnt, 0)) {
+        cnt = count.load(std::memory_order_relaxed);
       }
       std::this_thread::sleep_for(std::chrono::seconds(1));
       std::cout << "QPS: " << cnt << std::endl;
@@ -145,25 +151,32 @@ int main(int argc, char* argv[]) {
 
   std::unique_ptr<Semaphore> semaphore(new Semaphore(FLAGS_concurrency));
 
-  auto send_callback = [&](const std::error_code& ec, const SendReceipt& 
receipt) {
-    std::unique_lock<std::mutex> lk(mtx);
-    semaphore->release();
-    completed++;
-    count++;
-    if (completed >= FLAGS_total) {
-      cv.notify_all();
+  try {
+    for (std::size_t i = 0; i < FLAGS_total; ++i) {
+      auto message = Message::newBuilder()
+                         .withTopic(FLAGS_topic)
+                         .withTag("TagA")
+                         .withKeys({"Key-" + std::to_string(i)})
+                         .withGroup("message-group" + std::to_string(i % 
FLAGS_concurrency))
+                         .withBody(body)
+                         .build();
+      std::error_code ec;
+      auto callback = [&](const std::error_code& ec, const SendReceipt& 
receipt) mutable {
+        completed++;
+        count++;
+        semaphore->release();
+
+        if (completed >= FLAGS_total) {
+          cv.notify_all();
+        }
+      };
+
+      semaphore->acquire();
+      producer.send(std::move(message), callback);
+      std::cout << "Cached No." << i << " message" << std::endl;
     }
-  };
-
-  for (std::size_t i = 0; i < FLAGS_total; ++i) {
-    auto message = Message::newBuilder()
-                       .withTopic(FLAGS_topic)
-                       .withTag("TagA")
-                       .withKeys({"Key-" + std::to_string(i)})
-                       .withBody(body)
-                       .build();
-    semaphore->acquire();
-    producer.send(std::move(message), send_callback);
+  } catch (...) {
+    std::cerr << "Ah...No!!!" << std::endl;
   }
 
   {
@@ -178,4 +191,4 @@ int main(int argc, char* argv[]) {
   }
 
   return EXIT_SUCCESS;
-}
\ No newline at end of file
+}
diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp
index 57293c24..5e20cc12 100644
--- a/cpp/examples/ExampleProducer.cpp
+++ b/cpp/examples/ExampleProducer.cpp
@@ -57,6 +57,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -77,7 +78,7 @@ int main(int argc, char* argv[]) {
                       .withConfiguration(Configuration::newBuilder()
                                              .withEndpoints(FLAGS_access_point)
                                              
.withCredentialsProvider(credentials_provider)
-                                             .withSsl(true)
+                                             .withSsl(FLAGS_tls)
                                              .build())
                       .withTopics({FLAGS_topic})
                       .build();
@@ -88,8 +89,8 @@ int main(int argc, char* argv[]) {
   auto stats_lambda = [&] {
     while (!stopped.load(std::memory_order_relaxed)) {
       long cnt = count.load(std::memory_order_relaxed);
-      while (count.compare_exchange_weak(cnt, 0)) {
-        break;
+      while (!count.compare_exchange_weak(cnt, 0)) {
+        cnt = count.load(std::memory_order_relaxed);
       }
       std::this_thread::sleep_for(std::chrono::seconds(1));
       std::cout << "QPS: " << cnt << std::endl;
diff --git a/cpp/examples/ExampleProducerWithAsync.cpp 
b/cpp/examples/ExampleProducerWithAsync.cpp
index 62ee7781..d88dfc85 100644
--- a/cpp/examples/ExampleProducerWithAsync.cpp
+++ b/cpp/examples/ExampleProducerWithAsync.cpp
@@ -17,7 +17,6 @@
 #include <algorithm>
 #include <atomic>
 #include <condition_variable>
-#include <cstdint>
 #include <iostream>
 #include <mutex>
 #include <random>
@@ -97,6 +96,7 @@ DEFINE_uint32(total, 256, "Number of sample messages to 
publish");
 DEFINE_uint32(concurrency, 128, "Concurrency of async send");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -116,7 +116,7 @@ int main(int argc, char* argv[]) {
                       .withConfiguration(Configuration::newBuilder()
                                              .withEndpoints(FLAGS_access_point)
                                              
.withCredentialsProvider(credentials_provider)
-                                             .withSsl(true)
+                                             .withSsl(FLAGS_tls)
                                              .build())
                       .withTopics({FLAGS_topic})
                       .build();
@@ -127,8 +127,8 @@ int main(int argc, char* argv[]) {
   auto stats_lambda = [&] {
     while (!stopped.load(std::memory_order_relaxed)) {
       long cnt = count.load(std::memory_order_relaxed);
-      while (count.compare_exchange_weak(cnt, 0)) {
-        break;
+      while (!count.compare_exchange_weak(cnt, 0)) {
+        cnt = count.load(std::memory_order_relaxed);
       }
       std::this_thread::sleep_for(std::chrono::seconds(1));
       std::cout << "QPS: " << cnt << std::endl;
diff --git a/cpp/examples/ExampleProducerWithFifoMessage.cpp 
b/cpp/examples/ExampleProducerWithFifoMessage.cpp
index 09b8d407..4fa34f9d 100644
--- a/cpp/examples/ExampleProducerWithFifoMessage.cpp
+++ b/cpp/examples/ExampleProducerWithFifoMessage.cpp
@@ -54,6 +54,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -74,7 +75,7 @@ int main(int argc, char* argv[]) {
                       .withConfiguration(Configuration::newBuilder()
                                              .withEndpoints(FLAGS_access_point)
                                              
.withCredentialsProvider(credentials_provider)
-                                             .withSsl(true)
+                                             .withSsl(FLAGS_tls)
                                              .build())
                       .withTopics({FLAGS_topic})
                       .build();
@@ -83,10 +84,11 @@ int main(int argc, char* argv[]) {
   std::atomic_long count(0);
 
   auto stats_lambda = [&] {
+    std::cout << "Stats thread starts" << std::endl;
     while (!stopped.load(std::memory_order_relaxed)) {
       long cnt = count.load(std::memory_order_relaxed);
-      while (count.compare_exchange_weak(cnt, 0)) {
-        break;
+      while (!count.compare_exchange_weak(cnt, 0)) {
+        cnt = count.load(std::memory_order_relaxed);
       }
       std::this_thread::sleep_for(std::chrono::seconds(1));
       std::cout << "QPS: " << cnt << std::endl;
@@ -109,7 +111,7 @@ int main(int argc, char* argv[]) {
                          .build();
       std::error_code ec;
       SendReceipt send_receipt = producer.send(std::move(message), ec);
-      std::cout << "Message-ID: " << send_receipt.message_id << std::endl;
+      // std::cout << "Message-ID: " << send_receipt.message_id << std::endl;
       count++;
     }
   } catch (...) {
diff --git a/cpp/examples/ExampleProducerWithTimedMessage.cpp 
b/cpp/examples/ExampleProducerWithTimedMessage.cpp
index 8f12f5b6..d6237459 100644
--- a/cpp/examples/ExampleProducerWithTimedMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp
@@ -56,6 +56,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -75,7 +76,7 @@ int main(int argc, char* argv[]) {
                       .withConfiguration(Configuration::newBuilder()
                                              .withEndpoints(FLAGS_access_point)
                                              
.withCredentialsProvider(credentials_provider)
-                                             .withSsl(true)
+                                             .withSsl(FLAGS_tls)
                                              .build())
                       .withTopics({FLAGS_topic})
                       .build();
@@ -86,8 +87,8 @@ int main(int argc, char* argv[]) {
   auto stats_lambda = [&] {
     while (!stopped.load(std::memory_order_relaxed)) {
       long cnt = count.load(std::memory_order_relaxed);
-      while (count.compare_exchange_weak(cnt, 0)) {
-        break;
+      while (!count.compare_exchange_weak(cnt, 0)) {
+        cnt = count.load(std::memory_order_relaxed);
       }
       std::this_thread::sleep_for(std::chrono::seconds(1));
       std::cout << "QPS: " << cnt << std::endl;
diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp 
b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
index befb18ca..50620c5a 100644
--- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
@@ -54,6 +54,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -79,7 +80,7 @@ int main(int argc, char* argv[]) {
                       .withConfiguration(Configuration::newBuilder()
                                              .withEndpoints(FLAGS_access_point)
                                              
.withCredentialsProvider(credentials_provider)
-                                             .withSsl(true)
+                                             .withSsl(FLAGS_tls)
                                              .build())
                       .withTopics({FLAGS_topic})
                       .withTransactionChecker(checker)
@@ -91,8 +92,8 @@ int main(int argc, char* argv[]) {
   auto stats_lambda = [&] {
     while (!stopped.load(std::memory_order_relaxed)) {
       long cnt = count.load(std::memory_order_relaxed);
-      while (count.compare_exchange_weak(cnt, 0)) {
-        break;
+      while (!count.compare_exchange_weak(cnt, 0)) {
+        cnt = count.load(std::memory_order_relaxed);
       }
       std::this_thread::sleep_for(std::chrono::seconds(1));
       std::cout << "QPS: " << cnt << std::endl;
diff --git a/cpp/examples/ExamplePushConsumer.cpp 
b/cpp/examples/ExamplePushConsumer.cpp
index 1e20b2ee..66a85f4b 100644
--- a/cpp/examples/ExamplePushConsumer.cpp
+++ b/cpp/examples/ExamplePushConsumer.cpp
@@ -16,7 +16,6 @@
  */
 #include <chrono>
 #include <iostream>
-#include <mutex>
 #include <thread>
 
 #include "gflags/gflags.h"
@@ -30,6 +29,7 @@ DEFINE_string(access_point, "121.196.167.124:8081", "Service 
access URL, provide
 DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through 
your instance management console");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -58,7 +58,7 @@ int main(int argc, char* argv[]) {
                                                   
.withEndpoints(FLAGS_access_point)
                                                   
.withRequestTimeout(std::chrono::seconds(3))
                                                   
.withCredentialsProvider(credentials_provider)
-                                                  .withSsl(true)
+                                                  .withSsl(FLAGS_tls)
                                                   .build())
                            .withConsumeThreads(4)
                            .withListener(listener)
diff --git a/cpp/examples/ExampleSimpleConsumer.cpp 
b/cpp/examples/ExampleSimpleConsumer.cpp
index 4c30214f..aedec71e 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -16,7 +16,6 @@
  */
 #include <chrono>
 #include <iostream>
-#include <thread>
 
 #include "gflags/gflags.h"
 #include "rocketmq/Logger.h"
@@ -29,6 +28,7 @@ DEFINE_string(access_point, "121.196.167.124:8081", "Service 
access URL, provide
 DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through 
your instance management console");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
 
 int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -51,7 +51,7 @@ int main(int argc, char* argv[]) {
                              .withConfiguration(Configuration::newBuilder()
                                                     
.withEndpoints(FLAGS_access_point)
                                                     
.withCredentialsProvider(credentials_provider)
-                                                    .withSsl(true)
+                                                    .withSsl(FLAGS_tls)
                                                     .build())
                              .subscribe(FLAGS_topic, tag)
                              .build();
diff --git a/cpp/include/rocketmq/Configuration.h 
b/cpp/include/rocketmq/Configuration.h
index 0037c270..6dcd4137 100644
--- a/cpp/include/rocketmq/Configuration.h
+++ b/cpp/include/rocketmq/Configuration.h
@@ -44,7 +44,7 @@ public:
   }
 
   bool withSsl() const {
-    return withSsl_;
+    return tls_;
   }
 
 protected:
@@ -56,7 +56,7 @@ private:
   std::string               endpoints_;
   CredentialsProviderPtr    credentials_provider_;
   std::chrono::milliseconds 
request_timeout_{ConfigurationDefaults::RequestTimeout};
-  bool withSsl_ = true;
+  bool tls_ = true;
 };
 
 class ConfigurationBuilder {
@@ -67,7 +67,7 @@ public:
 
   ConfigurationBuilder& withRequestTimeout(std::chrono::milliseconds 
request_timeout);
 
-  ConfigurationBuilder& withSsl(bool enable);
+  ConfigurationBuilder& withSsl(bool with_ssl);
 
   Configuration build();
 
diff --git a/cpp/include/rocketmq/FifoProducer.h 
b/cpp/include/rocketmq/FifoProducer.h
new file mode 100644
index 00000000..1ee20d91
--- /dev/null
+++ b/cpp/include/rocketmq/FifoProducer.h
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "Configuration.h"
+#include "Message.h"
+#include "RocketMQ.h"
+#include "SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerImpl;
+class FifoProducerBuilder;
+class ProducerImpl;
+
+class FifoProducer {
+public:
+  static FifoProducerBuilder newBuilder();
+
+  void send(MessageConstPtr message, SendCallback callback);
+
+private:
+  std::shared_ptr<FifoProducerImpl> impl_;
+
+  explicit FifoProducer(std::shared_ptr<FifoProducerImpl> impl) : 
impl_(std::move(impl)) {
+  }
+
+  void start();
+
+  friend class FifoProducerBuilder;
+};
+
+class FifoProducerBuilder {
+public:
+  FifoProducerBuilder();
+
+  FifoProducerBuilder& withConfiguration(Configuration configuration);
+
+  FifoProducerBuilder& withTopics(const std::vector<std::string>& topics);
+
+  FifoProducerBuilder& withConcurrency(std::size_t concurrency);
+
+  FifoProducer build();
+
+private:
+  std::shared_ptr<FifoProducerImpl> impl_;
+  std::shared_ptr<ProducerImpl> producer_impl_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h
index 42004eb3..6b42843d 100644
--- a/cpp/include/rocketmq/Producer.h
+++ b/cpp/include/rocketmq/Producer.h
@@ -16,20 +16,17 @@
  */
 #pragma once
 
-#include <chrono>
-#include <functional>
 #include <memory>
 #include <system_error>
 #include <vector>
 
 #include "Configuration.h"
-#include "ErrorCode.h"
-#include "Logger.h"
 #include "Message.h"
 #include "SendCallback.h"
 #include "SendReceipt.h"
 #include "Transaction.h"
 #include "TransactionChecker.h"
+#include "rocketmq/Logger.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
diff --git a/cpp/include/rocketmq/SendReceipt.h 
b/cpp/include/rocketmq/SendReceipt.h
index 489df5ec..7eef6e79 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/include/rocketmq/SendReceipt.h
@@ -16,20 +16,21 @@
  */
 #pragma once
 
-#include <cstdint>
 #include <string>
-#include <utility>
 
 #include "RocketMQ.h"
+#include "rocketmq/Message.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
 struct SendReceipt {
+  std::string target;
+
   std::string message_id;
 
   std::string transaction_id;
 
-  std::string target;
+  MessageConstPtr message;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/CMakeLists.txt b/cpp/source/CMakeLists.txt
index d42c6a1a..dbf5ab01 100644
--- a/cpp/source/CMakeLists.txt
+++ b/cpp/source/CMakeLists.txt
@@ -53,7 +53,10 @@ target_link_libraries(rocketmq_shared
             opencensus::stats
             opencensus_proto
             spdlog)
+set(VERSION_SCRIPT ${CMAKE_CURRENT_SOURCE_DIR}/exports.map)
+set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} 
-Wl,--version-script=${VERSION_SCRIPT}")
 set_target_properties(rocketmq_shared
         PROPERTIES
+            LINK_DEPENDS ${VERSION_SCRIPT}
             LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}
             LIBRARY_OUTPUT_NAME rocketmq)
\ No newline at end of file
diff --git a/cpp/source/base/Configuration.cpp 
b/cpp/source/base/Configuration.cpp
index 2a136d5d..66cff2e8 100644
--- a/cpp/source/base/Configuration.cpp
+++ b/cpp/source/base/Configuration.cpp
@@ -38,8 +38,8 @@ ConfigurationBuilder& 
ConfigurationBuilder::withRequestTimeout(std::chrono::mill
   return *this;
 }
 
-ConfigurationBuilder& ConfigurationBuilder::withSsl(bool enable) {
-  configuration_.withSsl_ = enable;
+ConfigurationBuilder& ConfigurationBuilder::withSsl(bool with_ssl) {
+  configuration_.tls_ = with_ssl;
   return *this;
 }
 
diff --git a/cpp/source/client/ClientManagerImpl.cpp 
b/cpp/source/client/ClientManagerImpl.cpp
index 5865dbb2..7d724c7b 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -24,34 +24,30 @@
 #include <utility>
 #include <vector>
 
-#include "apache/rocketmq/v2/definition.pb.h"
 #include "InvocationContext.h"
 #include "LogInterceptor.h"
 #include "LogInterceptorFactory.h"
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
-#include "MessageExt.h"
-#include "MetadataConstants.h"
 #include "MixAll.h"
 #include "Protocol.h"
 #include "ReceiveMessageContext.h"
 #include "RpcClient.h"
 #include "RpcClientImpl.h"
 #include "Scheduler.h"
-#include "TlsHelper.h"
+#include "SchedulerImpl.h"
 #include "UtilAll.h"
 #include "google/protobuf/util/time_util.h"
 #include "grpcpp/create_channel.h"
 #include "rocketmq/ErrorCode.h"
-#include "rocketmq/SendReceipt.h"
+#include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool 
withSsl)
-    : scheduler_(std::make_shared<SchedulerImpl>()), 
resource_namespace_(std::move(resource_namespace)),
+ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool 
with_ssl)
+    : scheduler_(std::make_shared<SchedulerImpl>()),
+      resource_namespace_(std::move(resource_namespace)),
       state_(State::CREATED),
       
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
-      withSsl_(withSsl){
+      with_ssl_(with_ssl) {
   certificate_verifier_ = 
grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();
   tls_channel_credential_options_.set_verify_server_certs(false);
   tls_channel_credential_options_.set_check_call_host(false);
@@ -175,8 +171,10 @@ std::vector<std::string> 
ClientManagerImpl::cleanOfflineRpcClients() {
   return removed;
 }
 
-void ClientManagerImpl::heartbeat(const std::string& target_host, const 
Metadata& metadata,
-                                  const HeartbeatRequest& request, 
std::chrono::milliseconds timeout,
+void ClientManagerImpl::heartbeat(const std::string& target_host,
+                                  const Metadata& metadata,
+                                  const HeartbeatRequest& request,
+                                  std::chrono::milliseconds timeout,
                                   const std::function<void(const 
std::error_code&, const HeartbeatResponse&)>& cb) {
   SPDLOG_DEBUG("Prepare to send heartbeat to {}. Request: {}", target_host, 
request.DebugString());
   auto client = getRpcClient(target_host, true);
@@ -279,8 +277,10 @@ void ClientManagerImpl::doHeartbeat() {
   }
 }
 
-bool ClientManagerImpl::send(const std::string& target_host, const Metadata& 
metadata, SendMessageRequest& request,
-                             SendCallback cb) {
+bool ClientManagerImpl::send(const std::string& target_host,
+                             const Metadata& metadata,
+                             SendMessageRequest& request,
+                             SendResultCallback cb) {
   assert(cb);
   SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", 
target_host, request.DebugString());
   RpcClientSharedPtr client = getRpcClient(target_host);
@@ -306,15 +306,14 @@ bool ClientManagerImpl::send(const std::string& 
target_host, const Metadata& met
       return;
     }
 
-    SendReceipt send_receipt = {};
-    send_receipt.target = target_host;
-    std::error_code ec;
+    SendResult send_result = {};
+    send_result.target = target_host;
     if (!invocation_context->status.ok()) {
       SPDLOG_WARN("Failed to send message to {} due to gRPC error. gRPC code: 
{}, gRPC error message: {}",
                   invocation_context->remote_address, 
invocation_context->status.error_code(),
                   invocation_context->status.error_message());
-      ec = ErrorCode::RequestTimeout;
-      cb(ec, send_receipt);
+      send_result.ec = ErrorCode::RequestTimeout;
+      cb(send_result);
       return;
     }
 
@@ -323,8 +322,8 @@ bool ClientManagerImpl::send(const std::string& 
target_host, const Metadata& met
       case rmq::Code::OK: {
         if (!invocation_context->response.entries().empty()) {
           auto first = invocation_context->response.entries().begin();
-          send_receipt.message_id = first->message_id();
-          send_receipt.transaction_id = first->transaction_id();
+          send_result.message_id = first->message_id();
+          send_result.transaction_id = first->transaction_id();
         } else {
           SPDLOG_ERROR("Unexpected send-message-response: {}", 
invocation_context->response.DebugString());
         }
@@ -333,126 +332,127 @@ bool ClientManagerImpl::send(const std::string& 
target_host, const Metadata& met
 
       case rmq::Code::ILLEGAL_TOPIC: {
         SPDLOG_ERROR("IllegalTopic: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::IllegalTopic;
+        send_result.ec = ErrorCode::IllegalTopic;
         break;
       }
 
       case rmq::Code::ILLEGAL_MESSAGE_TAG: {
         SPDLOG_ERROR("IllegalMessageTag: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::IllegalMessageTag;
+        send_result.ec = ErrorCode::IllegalMessageTag;
         break;
       }
 
       case rmq::Code::ILLEGAL_MESSAGE_KEY: {
         SPDLOG_ERROR("IllegalMessageKey: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::IllegalMessageKey;
+        send_result.ec = ErrorCode::IllegalMessageKey;
         break;
       }
 
       case rmq::Code::ILLEGAL_MESSAGE_GROUP: {
         SPDLOG_ERROR("IllegalMessageGroup: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::IllegalMessageGroup;
+        send_result.ec = ErrorCode::IllegalMessageGroup;
         break;
       }
 
       case rmq::Code::ILLEGAL_MESSAGE_PROPERTY_KEY: {
         SPDLOG_ERROR("IllegalMessageProperty: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::IllegalMessageProperty;
+        send_result.ec = ErrorCode::IllegalMessageProperty;
         break;
       }
 
       case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: {
         SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}", 
status.message(), invocation_context->remote_address);
-        ec = ErrorCode::MessagePropertiesTooLarge;
+        send_result.ec = ErrorCode::MessagePropertiesTooLarge;
         break;
       }
 
       case rmq::Code::MESSAGE_BODY_TOO_LARGE: {
         SPDLOG_ERROR("MessageBodyTooLarge: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::MessageBodyTooLarge;
+        send_result.ec = ErrorCode::MessageBodyTooLarge;
         break;
       }
 
       case rmq::Code::TOPIC_NOT_FOUND: {
         SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::TopicNotFound;
+        send_result.ec = ErrorCode::TopicNotFound;
         break;
       }
 
       case rmq::Code::NOT_FOUND: {
         SPDLOG_WARN("NotFound: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::NotFound;
+        send_result.ec = ErrorCode::NotFound;
         break;
       }
 
       case rmq::Code::UNAUTHORIZED: {
         SPDLOG_WARN("Unauthenticated: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::Unauthorized;
+        send_result.ec = ErrorCode::Unauthorized;
         break;
       }
-        
+
       case rmq::Code::FORBIDDEN: {
         SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::Forbidden;
+        send_result.ec = ErrorCode::Forbidden;
         break;
       }
 
       case rmq::Code::MESSAGE_CORRUPTED: {
         SPDLOG_WARN("MessageCorrupted: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::MessageCorrupted;
+        send_result.ec = ErrorCode::MessageCorrupted;
         break;
       }
 
       case rmq::Code::TOO_MANY_REQUESTS: {
         SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::TooManyRequests;
+        send_result.ec = ErrorCode::TooManyRequests;
         break;
       }
 
       case rmq::Code::INTERNAL_SERVER_ERROR: {
         SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::InternalServerError;
+        send_result.ec = ErrorCode::InternalServerError;
         break;
       }
 
       case rmq::Code::HA_NOT_AVAILABLE: {
         SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::InternalServerError;
+        send_result.ec = ErrorCode::InternalServerError;
         break;
       }
 
       case rmq::Code::PROXY_TIMEOUT: {
         SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::GatewayTimeout;
+        send_result.ec = ErrorCode::GatewayTimeout;
         break;
       }
 
       case rmq::Code::MASTER_PERSISTENCE_TIMEOUT: {
         SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::GatewayTimeout;
+        send_result.ec = ErrorCode::GatewayTimeout;
         break;
       }
 
       case rmq::Code::SLAVE_PERSISTENCE_TIMEOUT: {
         SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), 
invocation_context->remote_address);
-        ec = ErrorCode::GatewayTimeout;
+        send_result.ec = ErrorCode::GatewayTimeout;
         break;
       }
 
       case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: {
-        SPDLOG_WARN("Message-property-conflict-with-type: Host={}, 
Response={}", invocation_context->remote_address, 
invocation_context->response.DebugString());
-        ec = ErrorCode::MessagePropertyConflictWithType;
+        SPDLOG_WARN("Message-property-conflict-with-type: Host={}, 
Response={}", invocation_context->remote_address,
+                    invocation_context->response.DebugString());
+        send_result.ec = ErrorCode::MessagePropertyConflictWithType;
         break;
       }
 
       default: {
         SPDLOG_WARN("NotSupported: Check and upgrade SDK to the latest. 
Host={}", invocation_context->remote_address);
-        ec = ErrorCode::NotSupported;
+        send_result.ec = ErrorCode::NotSupported;
         break;
       }
     }
 
-    cb(ec, send_receipt);
+    cb(send_result);
   };
 
   invocation_context->callback = completion_callback;
@@ -470,7 +470,8 @@ std::shared_ptr<grpc::Channel> 
ClientManagerImpl::createChannel(const std::strin
   
std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
 interceptor_factories;
   
interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
   auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
-      target_host, withSsl_ ? channel_credential_ : 
grpc::InsecureChannelCredentials(), channel_arguments_, 
std::move(interceptor_factories));
+      target_host, with_ssl_ ? channel_credential_ : 
grpc::InsecureChannelCredentials(), channel_arguments_,
+      std::move(interceptor_factories));
   return channel;
 }
 
@@ -513,27 +514,28 @@ void ClientManagerImpl::cleanRpcClients() {
   rpc_clients_.clear();
 }
 
-SendReceipt ClientManagerImpl::processSendResponse(const rmq::MessageQueue& 
message_queue,
-                                                   const SendMessageResponse& 
response, std::error_code& ec) {
-  SendReceipt send_receipt;
+SendResult ClientManagerImpl::processSendResponse(const rmq::MessageQueue& 
message_queue,
+                                                  const SendMessageResponse& 
response,
+                                                  std::error_code& ec) {
+  SendResult send_result;
 
   switch (response.status().code()) {
     case rmq::Code::OK: {
       assert(response.entries_size() > 0);
-      send_receipt.message_id = response.entries().begin()->message_id();
-      send_receipt.transaction_id = 
response.entries().begin()->transaction_id();
-      return send_receipt;
+      send_result.message_id = response.entries().begin()->message_id();
+      send_result.transaction_id = 
response.entries().begin()->transaction_id();
+      return send_result;
     }
     case rmq::Code::ILLEGAL_TOPIC: {
       ec = ErrorCode::BadRequest;
-      return send_receipt;
+      return send_result;
     }
     default: {
       // TODO: handle other cases.
       break;
     }
   }
-  return send_receipt;
+  return send_result;
 }
 
 void ClientManagerImpl::addClientObserver(std::weak_ptr<Client> client) {
@@ -541,8 +543,10 @@ void 
ClientManagerImpl::addClientObserver(std::weak_ptr<Client> client) {
   clients_.emplace_back(std::move(client));
 }
 
-void ClientManagerImpl::resolveRoute(const std::string& target_host, const 
Metadata& metadata,
-                                     const QueryRouteRequest& request, 
std::chrono::milliseconds timeout,
+void ClientManagerImpl::resolveRoute(const std::string& target_host,
+                                     const Metadata& metadata,
+                                     const QueryRouteRequest& request,
+                                     std::chrono::milliseconds timeout,
                                      const std::function<void(const 
std::error_code&, const TopicRouteDataPtr&)>& cb) {
   SPDLOG_DEBUG("Name server connection URL: {}", target_host);
   SPDLOG_DEBUG("Query route request: {}", request.DebugString());
@@ -646,7 +650,9 @@ void ClientManagerImpl::resolveRoute(const std::string& 
target_host, const Metad
 }
 
 void ClientManagerImpl::queryAssignment(
-    const std::string& target, const Metadata& metadata, const 
QueryAssignmentRequest& request,
+    const std::string& target,
+    const Metadata& metadata,
+    const QueryAssignmentRequest& request,
     std::chrono::milliseconds timeout,
     const std::function<void(const std::error_code&, const 
QueryAssignmentResponse&)>& cb) {
   SPDLOG_DEBUG("Prepare to send query assignment request to 
broker[address={}]", target);
@@ -748,8 +754,10 @@ void ClientManagerImpl::queryAssignment(
   client->asyncQueryAssignment(request, invocation_context);
 }
 
-void ClientManagerImpl::receiveMessage(const std::string& target_host, const 
Metadata& metadata,
-                                       const ReceiveMessageRequest& request, 
std::chrono::milliseconds timeout,
+void ClientManagerImpl::receiveMessage(const std::string& target_host,
+                                       const Metadata& metadata,
+                                       const ReceiveMessageRequest& request,
+                                       std::chrono::milliseconds timeout,
                                        ReceiveMessageCallback cb) {
   SPDLOG_DEBUG("Prepare to receive message from {} asynchronously. Request: 
{}", target_host, request.DebugString());
   RpcClientSharedPtr client = getRpcClient(target_host);
@@ -765,7 +773,6 @@ State ClientManagerImpl::state() const {
 }
 
 MessageConstSharedPtr ClientManagerImpl::wrapMessage(const rmq::Message& item) 
{
-  assert(item.topic().resource_namespace() == resource_namespace_);
   auto builder = Message::newBuilder();
 
   // base
@@ -955,8 +962,11 @@ SchedulerSharedPtr ClientManagerImpl::getScheduler() {
   return scheduler_;
 }
 
-void ClientManagerImpl::ack(const std::string& target, const Metadata& 
metadata, const AckMessageRequest& request,
-                            std::chrono::milliseconds timeout, const 
std::function<void(const std::error_code&)>& cb) {
+void ClientManagerImpl::ack(const std::string& target,
+                            const Metadata& metadata,
+                            const AckMessageRequest& request,
+                            std::chrono::milliseconds timeout,
+                            const std::function<void(const std::error_code&)>& 
cb) {
   std::string target_host(target.data(), target.length());
   SPDLOG_DEBUG("Prepare to ack message against {} asynchronously. 
AckMessageRequest: {}", target_host,
                request.DebugString());
@@ -1066,8 +1076,11 @@ void ClientManagerImpl::ack(const std::string& target, 
const Metadata& metadata,
 }
 
 void ClientManagerImpl::changeInvisibleDuration(
-    const std::string& target_host, const Metadata& metadata, const 
ChangeInvisibleDurationRequest& request,
-    std::chrono::milliseconds timeout, const std::function<void(const 
std::error_code&)>& completion_callback) {
+    const std::string& target_host,
+    const Metadata& metadata,
+    const ChangeInvisibleDurationRequest& request,
+    std::chrono::milliseconds timeout,
+    const std::function<void(const std::error_code&)>& completion_callback) {
   RpcClientSharedPtr client = getRpcClient(target_host);
   assert(client);
   auto invocation_context = new 
InvocationContext<ChangeInvisibleDurationResponse>();
@@ -1133,7 +1146,7 @@ void ClientManagerImpl::changeInvisibleDuration(
         ec = ErrorCode::Forbidden;
         break;
       }
-        
+
       case rmq::Code::INTERNAL_SERVER_ERROR: {
         SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), 
invocation_context->remote_address);
         ec = ErrorCode::InternalServerError;
@@ -1159,7 +1172,9 @@ void ClientManagerImpl::changeInvisibleDuration(
 }
 
 void ClientManagerImpl::endTransaction(
-    const std::string& target_host, const Metadata& metadata, const 
EndTransactionRequest& request,
+    const std::string& target_host,
+    const Metadata& metadata,
+    const EndTransactionRequest& request,
     std::chrono::milliseconds timeout,
     const std::function<void(const std::error_code&, const 
EndTransactionResponse&)>& cb) {
   RpcClientSharedPtr client = getRpcClient(target_host);
@@ -1339,7 +1354,7 @@ void 
ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe
         ec = ErrorCode::ServiceUnavailable;
         break;
       }
-        
+
       case rmq::Code::TOO_MANY_REQUESTS: {
         ec = ErrorCode::TooManyRequests;
         break;
@@ -1362,7 +1377,8 @@ void 
ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe
   client->asyncForwardMessageToDeadLetterQueue(request, invocation_context);
 }
 
-std::error_code ClientManagerImpl::notifyClientTermination(const std::string& 
target_host, const Metadata& metadata,
+std::error_code ClientManagerImpl::notifyClientTermination(const std::string& 
target_host,
+                                                           const Metadata& 
metadata,
                                                            const 
NotifyClientTerminationRequest& request,
                                                            
std::chrono::milliseconds timeout) {
   std::error_code ec;
@@ -1446,4 +1462,4 @@ void ClientManagerImpl::submit(std::function<void()> 
task) {
 const char* ClientManagerImpl::HEARTBEAT_TASK_NAME = "heartbeat-task";
 const char* ClientManagerImpl::STATS_TASK_NAME = "stats-task";
 
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/client/SessionImpl.cpp 
b/cpp/source/client/SessionImpl.cpp
index 0ca3fff2..36416829 100644
--- a/cpp/source/client/SessionImpl.cpp
+++ b/cpp/source/client/SessionImpl.cpp
@@ -16,12 +16,14 @@
  */
 
 #include "SessionImpl.h"
+
 #include "rocketmq/Logger.h"
 #include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-SessionImpl::SessionImpl(std::weak_ptr<Client> client, 
std::shared_ptr<RpcClient> rpc_client) : client_(client), 
rpc_client_(rpc_client) {
+SessionImpl::SessionImpl(std::weak_ptr<Client> client, 
std::shared_ptr<RpcClient> rpc_client)
+    : client_(client), rpc_client_(rpc_client) {
   telemetry_ = rpc_client->asyncTelemetry(client_);
   syncSettings();
 }
@@ -39,8 +41,8 @@ void SessionImpl::syncSettings() {
 }
 
 SessionImpl::~SessionImpl() {
-  SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress());
   telemetry_->fireClose();
+  SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress());
 }
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp 
b/cpp/source/client/TelemetryBidiReactor.cpp
index 6c5b2f93..d75ac361 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -22,21 +22,24 @@
 #include <utility>
 
 #include "ClientManager.h"
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
 #include "MessageExt.h"
 #include "Metadata.h"
 #include "RpcClient.h"
 #include "Signature.h"
 #include "google/protobuf/util/time_util.h"
+#include "rocketmq/Logger.h"
+#include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
 TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
                                            rmq::MessagingService::Stub* stub,
                                            std::string peer_address)
-    : client_(client), peer_address_(std::move(peer_address)), 
stream_state_(StreamState::Created) {
-  auto ptr = client.lock();
+    : client_(client),
+      peer_address_(std::move(peer_address)),
+      read_state_(StreamState::Created),
+      write_state_(StreamState::Created) {
+  auto ptr = client_.lock();
   auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1);
   context_.set_deadline(deadline);
   Metadata metadata;
@@ -45,69 +48,126 @@ 
TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
     context_.AddMetadata(entry.first, entry.second);
   }
   stub->async()->Telemetry(&context_, this);
+  write_state_ = StreamState::Ready;
+  // Increase hold for write stream.
+  AddHold();
   StartCall();
 }
 
 TelemetryBidiReactor::~TelemetryBidiReactor() {
-  SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}", 
peer_address_,
-              static_cast<std::uint8_t>(stream_state_));
+  SPDLOG_INFO("Telemetry stream for {} destructed. ReadStreamState={}, 
WriteStreamState={}", peer_address_,
+              static_cast<std::uint8_t>(read_state_), 
static_cast<std::uint8_t>(read_state_));
 }
 
 bool TelemetryBidiReactor::await() {
-  absl::MutexLock lk(&server_setting_received_mtx_);
-  if (server_setting_received_) {
+  absl::MutexLock lk(&state_mtx_);
+  if (StreamState::Created != write_state_) {
     return true;
   }
 
-  server_setting_received_cv_.Wait(&server_setting_received_mtx_);
-  return server_setting_received_;
+  state_cv_.Wait(&state_mtx_);
+  return StreamState::Error != write_state_;
 }
 
 void TelemetryBidiReactor::OnWriteDone(bool ok) {
-  SPDLOG_DEBUG("OnWriteDone: {}", ok);
-
-  {
-    bool expect = true;
-    if (!command_inflight_.compare_exchange_strong(expect, false, 
std::memory_order_relaxed)) {
-      SPDLOG_WARN("Illegal command-inflight state");
-    }
-  }
+  SPDLOG_DEBUG("{}#OnWriteDone", peer_address_);
 
   if (!ok) {
-    SPDLOG_WARN("Failed to write telemetry command {} to {}", 
write_.DebugString(), peer_address_);
+    RemoveHold();
     {
-      absl::MutexLock lk(&stream_state_mtx_);
-      stream_state_ = StreamState::WriteDone;
-    }
+      absl::MutexLock lk(&state_mtx_);
+      SPDLOG_WARN("Failed to write telemetry command {} to {}", 
writes_.front().DebugString(), peer_address_);
+      write_state_ = StreamState::Error;
+
+      // Sync read state.
+      switch (read_state_) {
+        case StreamState::Created:
+        case StreamState::Ready: {
+          SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                       static_cast<std::uint8_t>(StreamState::Closed));
+          read_state_ = StreamState::Closed;
+          break;
+        }
+        case StreamState::Inflight: {
+          SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                       static_cast<std::uint8_t>(StreamState::Closing));
+          read_state_ = StreamState::Closing;
+          break;
+        }
+        case StreamState::Closing:
+        case StreamState::Error:
+        case StreamState::Closed: {
+          break;
+        }
+      }
 
-    fireClose();
+      state_cv_.SignalAll();
+    }
     return;
+  } else {
+    absl::MutexLock lk(&state_mtx_);
+    if (StreamState::Inflight == write_state_) {
+      write_state_ = StreamState::Ready;
+    }
   }
 
+  // Check if the read stream has started.
+  fireRead();
+
+  // Remove the command that has been written to server.
   {
-    absl::MutexLock lk(&stream_state_mtx_);
-    if (StreamState::Created == stream_state_) {
-      stream_state_ = StreamState::Active;
-      fireRead();
-    }
+    absl::MutexLock lk(&writes_mtx_);
+    writes_.pop_front();
   }
 
-  fireWrite();
+  tryWriteNext();
 }
 
 void TelemetryBidiReactor::OnReadDone(bool ok) {
-  SPDLOG_DEBUG("OnReadDone: ok={}", ok);
-  if (!ok) {
-    if (client_.lock()) {
-      SPDLOG_WARN("Failed to read telemetry command from {}", peer_address_);
-    }
-
-    {
-      absl::MutexLock lk(&stream_state_mtx_);
-      stream_state_ = StreamState::ReadDone;
+  SPDLOG_DEBUG("{}#OnReadDone", peer_address_);
+  {
+    absl::MutexLock lk(&state_mtx_);
+    if (!ok) {
+      // Remove read hold.
+      RemoveHold();
+      {
+        SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                     static_cast<std::uint8_t>(StreamState::Error));
+        read_state_ = StreamState::Error;
+        SPDLOG_WARN("Failed to read from telemetry stream from {}", 
peer_address_);
+
+        // Sync write state
+        switch (write_state_) {
+          case StreamState::Created: {
+            // Not reachable
+            break;
+          }
+          case StreamState::Ready: {
+            write_state_ = StreamState::Closed;
+            // There is no inflight write request, remove write hold on its 
behalf.
+            RemoveHold();
+            state_cv_.SignalAll();
+            break;
+          }
+          case StreamState::Inflight: {
+            write_state_ = StreamState::Closing;
+            break;
+          }
+          case StreamState::Closing:
+          case StreamState::Error:
+          case StreamState::Closed: {
+            break;
+          }
+        }
+      }
+      return;
+    } else if (StreamState::Closing == read_state_) {
+      SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                   static_cast<std::uint8_t>(StreamState::Closed));
+      read_state_ = StreamState::Closed;
+      state_cv_.SignalAll();
+      return;
     }
-    fireClose();
-    return;
   }
 
   SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, 
read_.DebugString());
@@ -122,13 +182,6 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
       auto settings = read_.settings();
       SPDLOG_INFO("Received settings from {}: {}", peer_address_, 
settings.DebugString());
       applySettings(settings);
-      {
-        absl::MutexLock lk(&server_setting_received_mtx_);
-        if (!server_setting_received_) {
-          server_setting_received_ = true;
-          server_setting_received_cv_.SignalAll();
-        }
-      }
       break;
     }
     case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
@@ -151,11 +204,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
       TelemetryCommand response;
       
response.mutable_thread_stack_trace()->set_nonce(read_.print_thread_stack_trace_command().nonce());
       
response.mutable_thread_stack_trace()->set_thread_stack_trace("PrintStackTrace 
is not supported");
-      {
-        absl::MutexLock lk(&writes_mtx_);
-        writes_.push_back(response);
-      }
-      fireWrite();
+      write(std::move(response));
       break;
     }
 
@@ -188,7 +237,18 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
     }
   }
 
-  fireRead();
+  {
+    absl::MutexLock lk(&state_mtx_);
+    if (StreamState::Inflight == read_state_) {
+      SPDLOG_DEBUG("Spawn new read op, read-state={}", 
static_cast<std::uint8_t>(read_state_));
+      StartRead(&read_);
+    } else if (read_state_ == StreamState::Closing) {
+      SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                   static_cast<std::uint8_t>(StreamState::Closed));
+      read_state_ = StreamState::Closed;
+      state_cv_.SignalAll();
+    }
+  }
 }
 
 void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) {
@@ -247,10 +307,33 @@ void TelemetryBidiReactor::applyBackoffPolicy(const 
rmq::Settings& settings, std
 }
 
 void TelemetryBidiReactor::applyPublishingConfig(const rmq::Settings& 
settings, std::shared_ptr<Client> client) {
+  // The server may have implicitly assumed a namespace for the client.
+  if (!settings.publishing().topics().empty()) {
+    for (const auto& topic : settings.publishing().topics()) {
+      if (topic.resource_namespace() != client->config().resource_namespace) {
+        SPDLOG_INFO("Client namespace is changed from [{}] to [{}]", 
client->config().resource_namespace,
+                    topic.resource_namespace());
+        client->config().resource_namespace = topic.resource_namespace();
+        break;
+      }
+    }
+  }
   client->config().publisher.max_body_size = 
settings.publishing().max_body_size();
 }
 
 void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& 
settings, std::shared_ptr<Client> client) {
+  // The server may have implicitly assumed a namespace for the client.
+  if (!settings.subscription().subscriptions().empty()) {
+    for (const auto& subscription : settings.subscription().subscriptions()) {
+      if (subscription.topic().resource_namespace() != 
client->config().resource_namespace) {
+        SPDLOG_INFO("Client namespace is changed from [{}] to [{}]", 
client->config().resource_namespace,
+                    subscription.topic().resource_namespace());
+        client->config().resource_namespace = 
subscription.topic().resource_namespace();
+        break;
+      }
+    }
+  }
+
   client->config().subscriber.fifo = settings.subscription().fifo();
   auto polling_timeout =
       
google::protobuf::util::TimeUtil::DurationToMilliseconds(settings.subscription().long_polling_timeout());
@@ -259,48 +342,144 @@ void TelemetryBidiReactor::applySubscriptionConfig(const 
rmq::Settings& settings
 }
 
 void TelemetryBidiReactor::fireRead() {
-  SPDLOG_DEBUG("{}#fireRead", peer_address_);
+  absl::MutexLock lk(&state_mtx_);
+  if (StreamState::Created != read_state_) {
+    SPDLOG_DEBUG("Further read from {} is not allowded due to 
stream-state={}", peer_address_,
+                 static_cast<std::uint8_t>(read_state_));
+    return;
+  }
+  SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+               static_cast<std::uint8_t>(StreamState::Ready));
+  read_state_ = StreamState::Ready;
+  AddHold();
+  SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+               static_cast<std::uint8_t>(StreamState::Inflight));
+  read_state_ = StreamState::Inflight;
   StartRead(&read_);
 }
 
 void TelemetryBidiReactor::write(TelemetryCommand command) {
+  SPDLOG_DEBUG("{}#write", peer_address_);
+  {
+    absl::MutexLock lk(&state_mtx_);
+    // Reject incoming write commands if the stream state is closing or has 
witnessed some error.
+    switch (write_state_) {
+      case StreamState::Closing:
+      case StreamState::Error:
+      case StreamState::Closed:
+        return;
+      default:
+        // no-op
+        break;
+    }
+  }
+
   {
     absl::MutexLock lk(&writes_mtx_);
     writes_.push_back(command);
   }
-  fireWrite();
+  tryWriteNext();
 }
 
-void TelemetryBidiReactor::fireWrite() {
-  SPDLOG_DEBUG("{}#fireWrite", peer_address_);
+void TelemetryBidiReactor::tryWriteNext() {
+  SPDLOG_DEBUG("{}#tryWriteNext", peer_address_);
+  {
+    absl::MutexLock lk(&state_mtx_);
+    if (StreamState::Error == write_state_ || StreamState::Closed == 
write_state_) {
+      SPDLOG_WARN("Further write to {} is not allowded due to 
stream-state={}", peer_address_,
+                  static_cast<std::uint8_t>(write_state_));
+      return;
+    }
+  }
+
   {
     absl::MutexLock lk(&writes_mtx_);
-    if (writes_.empty()) {
+    if (writes_.empty() && StreamState::Closing != write_state_) {
       SPDLOG_DEBUG("No TelemtryCommand to write. Peer={}", peer_address_);
       return;
     }
 
-    bool expect = false;
-    if (command_inflight_.compare_exchange_strong(expect, true, 
std::memory_order_relaxed)) {
-      write_ = std::move(*writes_.begin());
-      writes_.erase(writes_.begin());
+    if (StreamState::Ready == write_state_) {
+      write_state_ = StreamState::Inflight;
+    }
+
+    if (writes_.empty()) {
+      // Tell server there is no more write requests.
+      StartWritesDone();
     } else {
-      SPDLOG_DEBUG("Another command is already on the wire. Peer={}", 
peer_address_);
-      return;
+      SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, 
writes_.front().DebugString());
+      StartWrite(&(writes_.front()));
     }
   }
-  SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, 
write_.DebugString());
-  StartWrite(&write_);
 }
 
 void TelemetryBidiReactor::fireClose() {
   SPDLOG_INFO("{}#fireClose", peer_address_);
-  if (StreamState::Active == stream_state_) {
-    StartWritesDone();
-    {
-      absl::MutexLock lk(&stream_state_mtx_);
-      if (StreamState::Active == stream_state_) {
-        stream_state_cv_.Wait(&stream_state_mtx_);
+
+  {
+    // Acquire state lock
+    absl::MutexLock lk(&state_mtx_);
+
+    // Transition read state
+    switch (read_state_) {
+      case StreamState::Created:
+      case StreamState::Ready: {
+        SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                     static_cast<std::uint8_t>(StreamState::Closed));
+        read_state_ = StreamState::Closed;
+        state_cv_.SignalAll();
+        break;
+      }
+
+      case StreamState::Inflight: {
+        SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                     static_cast<std::uint8_t>(StreamState::Closing));
+        read_state_ = StreamState::Closing;
+        break;
+      }
+      case StreamState::Closing: {
+        break;
+      }
+      case StreamState::Closed:
+      case StreamState::Error: {
+        state_cv_.SignalAll();
+        break;
+      }
+    }
+
+    // Transition write state
+    switch (write_state_) {
+      case StreamState::Created:
+      case StreamState::Ready:
+      case StreamState::Inflight: {
+        SPDLOG_DEBUG("Change write-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                     static_cast<std::uint8_t>(StreamState::Closing));
+        write_state_ = StreamState::Closing;
+        break;
+      }
+      case StreamState::Closing: {
+        break;
+      }
+      case StreamState::Closed:
+      case StreamState::Error: {
+        state_cv_.SignalAll();
+        break;
+      }
+    }
+  }
+
+  if (StreamState::Closing == write_state_) {
+    tryWriteNext();
+  }
+
+  {
+    // Acquire state lock
+    absl::MutexLock lk(&state_mtx_);
+    while ((StreamState::Closed != read_state_ && StreamState::Error != 
read_state_) ||
+           (StreamState::Closed != write_state_ && StreamState::Error != 
write_state_)) {
+      if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) {
+        SPDLOG_WARN("StreamState CondVar timed out before getting signalled: 
read-state={}, write-state={}",
+                    static_cast<uint8_t>(read_state_), 
static_cast<uint8_t>(write_state_));
       }
     }
   }
@@ -308,6 +487,20 @@ void TelemetryBidiReactor::fireClose() {
 
 void TelemetryBidiReactor::OnWritesDoneDone(bool ok) {
   SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
+  assert(StreamState::Closing == write_state_);
+
+  absl::MutexLock lk(&state_mtx_);
+  // Remove the hold for the write stream.
+  RemoveHold();
+
+  if (!ok) {
+    write_state_ = StreamState::Error;
+    SPDLOG_WARN("Previous telemetry write to {} failed", peer_address_);
+  } else {
+    write_state_ = StreamState::Closed;
+    SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
+  }
+  state_cv_.SignalAll();
 }
 
 void TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) {
@@ -315,7 +508,7 @@ void 
TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) {
     absl::MutexLock lk(&writes_mtx_);
     writes_.emplace_back(command);
   }
-  fireWrite();
+  tryWriteNext();
 }
 
 /// Notifies the application that all operations associated with this RPC
@@ -334,9 +527,18 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& 
status) {
 
   {
     SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_);
-    absl::MutexLock lk(&stream_state_mtx_);
-    stream_state_ = StreamState::Closed;
-    stream_state_cv_.SignalAll();
+    absl::MutexLock lk(&state_mtx_);
+    if (StreamState::Error != read_state_) {
+      SPDLOG_DEBUG("Change read-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                   static_cast<std::uint8_t>(StreamState::Closed));
+      read_state_ = StreamState::Closed;
+    }
+    if (StreamState::Error != write_state_) {
+      SPDLOG_DEBUG("Change write-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                   static_cast<std::uint8_t>(StreamState::Closed));
+      write_state_ = StreamState::Closed;
+    }
+    state_cv_.SignalAll();
   }
 
   auto client = client_.lock();
@@ -349,4 +551,20 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& 
status) {
   }
 }
 
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
+void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) {
+  SPDLOG_DEBUG("{}#OnReadInitialMetadataDone", peer_address_);
+
+  if (!ok) {
+    absl::MutexLock lk(&state_mtx_);
+    SPDLOG_DEBUG("Change write-state {} --> {}", 
static_cast<std::uint8_t>(read_state_),
+                 static_cast<std::uint8_t>(StreamState::Error));
+    read_state_ = StreamState::Error;
+    state_cv_.SignalAll();
+    SPDLOG_WARN("Read of initial-metadata failed from {}", peer_address_);
+    return;
+  }
+
+  SPDLOG_DEBUG("Received initial metadata from {}", peer_address_);
+}
+
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/client/include/ClientManager.h 
b/cpp/source/client/include/ClientManager.h
index 56325fa4..02b232b2 100644
--- a/cpp/source/client/include/ClientManager.h
+++ b/cpp/source/client/include/ClientManager.h
@@ -22,14 +22,12 @@
 #include <system_error>
 
 #include "Client.h"
-#include "MessageExt.h"
 #include "Metadata.h"
 #include "ReceiveMessageCallback.h"
 #include "RpcClient.h"
 #include "Scheduler.h"
-#include "TelemetryBidiReactor.h"
+#include "SendResultCallback.h"
 #include "TopicRouteData.h"
-#include "rocketmq/SendCallback.h"
 #include "rocketmq/State.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
@@ -93,8 +91,10 @@ public:
   virtual void receiveMessage(const std::string& target, const Metadata& 
metadata, const ReceiveMessageRequest& request,
                               std::chrono::milliseconds timeout, 
ReceiveMessageCallback callback) = 0;
 
-  virtual bool send(const std::string& target_host, const Metadata& metadata, 
SendMessageRequest& request,
-                    SendCallback cb) = 0;
+  virtual bool send(const std::string& target_host,
+                    const Metadata& metadata,
+                    SendMessageRequest& request,
+                    SendResultCallback cb) = 0;
 
   virtual std::error_code notifyClientTermination(const std::string& 
target_host, const Metadata& metadata,
                                                   const 
NotifyClientTerminationRequest& request,
diff --git a/cpp/source/client/include/ClientManagerImpl.h 
b/cpp/source/client/include/ClientManagerImpl.h
index 653fcad3..5f1b27ca 100644
--- a/cpp/source/client/include/ClientManagerImpl.h
+++ b/cpp/source/client/include/ClientManagerImpl.h
@@ -20,7 +20,6 @@
 #include <chrono>
 #include <cstdint>
 #include <functional>
-#include <future>
 #include <memory>
 #include <string>
 #include <system_error>
@@ -29,18 +28,13 @@
 #include "Client.h"
 #include "ClientManager.h"
 #include "InsecureCertificateVerifier.h"
-#include "InvocationContext.h"
 #include "ReceiveMessageCallback.h"
 #include "RpcClientImpl.h"
-#include "SchedulerImpl.h"
-#include "SendMessageContext.h"
-#include "TelemetryBidiReactor.h"
 #include "ThreadPoolImpl.h"
 #include "TopicRouteData.h"
 #include "absl/base/thread_annotations.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/container/flat_hash_set.h"
-#include "absl/strings/string_view.h"
 #include "absl/synchronization/mutex.h"
 #include "rocketmq/State.h"
 
@@ -54,7 +48,7 @@ public:
    * effectively.
    * @param resource_namespace Abstract resource namespace, in which this 
client manager lives.
    */
-  explicit ClientManagerImpl(std::string resource_namespace, bool withSsl = 
true);
+  explicit ClientManagerImpl(std::string resource_namespace, bool with_ssl = 
true);
 
   ~ClientManagerImpl() override;
 
@@ -89,7 +83,7 @@ public:
   bool send(const std::string& target_host,
             const Metadata& metadata,
             SendMessageRequest& request,
-            SendCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_);
+            SendResultCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_);
 
   /**
    * Get a RpcClient according to the given target hosts, which follows scheme 
specified
@@ -105,7 +99,7 @@ public:
   RpcClientSharedPtr getRpcClient(const std::string& target_host, bool 
need_heartbeat = true) override
       LOCKS_EXCLUDED(rpc_clients_mtx_);
 
-  static SendReceipt processSendResponse(const rmq::MessageQueue& 
message_queue,
+  static SendResult processSendResponse(const rmq::MessageQueue& message_queue,
                                          const SendMessageResponse& response,
                                          std::error_code& ec);
 
@@ -242,7 +236,7 @@ private:
   grpc::ChannelArguments channel_arguments_;
 
   bool trace_{false};
-  bool withSsl_;
+  bool with_ssl_;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/include/rocketmq/SendReceipt.h 
b/cpp/source/client/include/SendResult.h
similarity index 88%
copy from cpp/include/rocketmq/SendReceipt.h
copy to cpp/source/client/include/SendResult.h
index 489df5ec..3596f61f 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/source/client/include/SendResult.h
@@ -16,20 +16,18 @@
  */
 #pragma once
 
-#include <cstdint>
-#include <string>
-#include <utility>
+#include <system_error>
 
-#include "RocketMQ.h"
+#include "rocketmq/RocketMQ.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-struct SendReceipt {
-  std::string message_id;
+struct SendResult {
+  std::error_code ec;
+  std::string target;
 
+  std::string message_id;
   std::string transaction_id;
-
-  std::string target;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/include/rocketmq/SendReceipt.h 
b/cpp/source/client/include/SendResultCallback.h
similarity index 80%
copy from cpp/include/rocketmq/SendReceipt.h
copy to cpp/source/client/include/SendResultCallback.h
index 489df5ec..182f649a 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/source/client/include/SendResultCallback.h
@@ -16,20 +16,12 @@
  */
 #pragma once
 
-#include <cstdint>
-#include <string>
-#include <utility>
+#include <functional>
 
-#include "RocketMQ.h"
+#include "SendResult.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-struct SendReceipt {
-  std::string message_id;
-
-  std::string transaction_id;
-
-  std::string target;
-};
+using SendResultCallback = std::function<void(const SendResult&)>;
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/TelemetryBidiReactor.h 
b/cpp/source/client/include/TelemetryBidiReactor.h
index 9fe65f31..3bdbe3d3 100644
--- a/cpp/source/client/include/TelemetryBidiReactor.h
+++ b/cpp/source/client/include/TelemetryBidiReactor.h
@@ -19,6 +19,7 @@
 #include <atomic>
 #include <chrono>
 #include <cstdint>
+#include <list>
 #include <memory>
 #include <utility>
 #include <vector>
@@ -34,12 +35,29 @@ ROCKETMQ_NAMESPACE_BEGIN
 enum class StreamState : std::uint8_t
 {
   Created = 0,
-  Active = 1,
-  ReadDone = 2,
-  WriteDone = 3,
+  Ready = 1,
+  Inflight = 2,
+  Closing = 3,
   Closed = 4,
+  Error = 5,
 };
 
+/// write-stream-state: created --> ready --> inflight --> ready --> ...
+///                                                    --> error
+///                                                    --> closing --> closed
+///                                       --> closing  --> closed
+///                                                    --> error
+///
+///
+/// read-stream-state: created --> ready --> inflight --> inflight
+///                                                   --> closing --> closed
+///                                                   --> error
+///                                      --> closed
+/// requirement:
+///    1, fireClose --> blocking await till bidireactor is closed;
+///    2, when session is closed and client is still active, recreate a new 
session to accept incoming commands from
+///    server 3, after writing the first Settings telemetry command, launch 
the read directional stream
+///
 class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, 
TelemetryCommand>,
                              public 
std::enable_shared_from_this<TelemetryBidiReactor> {
 public:
@@ -47,24 +65,54 @@ public:
 
   ~TelemetryBidiReactor();
 
-  void OnWriteDone(bool ok) override;
-
-  void OnWritesDoneDone(bool ok) override;
-
-  void OnReadDone(bool ok) override;
-
+  /// Notifies the application that all operations associated with this RPC
+  /// have completed and all Holds have been removed. OnDone provides the RPC
+  /// status outcome for both successful and failed RPCs and will be called in
+  /// all cases. If it is not called, it indicates an application-level problem
+  /// (like failure to remove a hold).
+  ///
+  /// \param[in] s The status outcome of this RPC
   void OnDone(const grpc::Status& status) override;
 
-  void fireRead();
+  /// Notifies the application that a read of initial metadata from the
+  /// server is done. If the application chooses not to implement this method,
+  /// it can assume that the initial metadata has been read before the first
+  /// call of OnReadDone or OnDone.
+  ///
+  /// \param[in] ok Was the initial metadata read successfully? If false, no
+  ///               new read/write operation will succeed, and any further
+  ///               Start* operations should not be called.
+  void OnReadInitialMetadataDone(bool /*ok*/) override;
+
+  /// Notifies the application that a StartRead operation completed.
+  ///
+  /// \param[in] ok Was it successful? If false, no new read/write operation
+  ///               will succeed, and any further Start* should not be called.
+  void OnReadDone(bool ok) override;
 
-  void fireWrite();
+  /// Notifies the application that a StartWrite or StartWriteLast operation
+  /// completed.
+  ///
+  /// \param[in] ok Was it successful? If false, no new read/write operation
+  ///               will succeed, and any further Start* should not be called.
+  void OnWriteDone(bool ok) override;
 
-  void fireClose();
+  /// Notifies the application that a StartWritesDone operation completed. Note
+  /// that this is only used on explicit StartWritesDone operations and not for
+  /// those that are implicitly invoked as part of a StartWriteLast.
+  ///
+  /// \param[in] ok Was it successful? If false, the application will later see
+  ///               the failure reflected as a bad status in OnDone and no
+  ///               further Start* should be called.
+  void OnWritesDoneDone(bool ok) override;
 
+  /// Core API method to initiate this bidirectional stream.
   void write(TelemetryCommand command);
 
   bool await();
 
+  void fireClose();
+
 private:
   grpc::ClientContext context_;
 
@@ -75,17 +123,15 @@ private:
 
   /**
    * @brief Buffered commands to write to server
+   *
+   * TODO: move buffered commands to a shared container, which may survive
+   * multiple TelemetryBidiReactor lifecycles.
    */
-  std::vector<TelemetryCommand> writes_ GUARDED_BY(writes_mtx_);
+  std::list<TelemetryCommand> writes_ GUARDED_BY(writes_mtx_);
   absl::Mutex writes_mtx_;
 
   /**
-   * @brief The command that is currently being written back to server.
-   */
-  TelemetryCommand write_;
-
-  /**
-   * @brief Each TelemetryBidiReactor belongs to a specific client as its 
owner. 
+   * @brief Each TelemetryBidiReactor belongs to a specific client as its 
owner.
    */
   std::weak_ptr<Client> client_;
 
@@ -94,18 +140,12 @@ private:
    */
   std::string peer_address_;
 
-  /**
-   * @brief Indicate if there is a command being written to network.
-   */
-  std::atomic_bool command_inflight_{false};
-
-  StreamState stream_state_ GUARDED_BY(stream_state_mtx_);
-  absl::Mutex stream_state_mtx_;
-  absl::CondVar stream_state_cv_;
+  StreamState read_state_ GUARDED_BY(state_mtx_);
+  StreamState write_state_ GUARDED_BY(state_mtx_);
+  absl::Mutex state_mtx_;
+  absl::CondVar state_cv_;
 
-  bool server_setting_received_ 
GUARDED_BY(server_setting_received_mtx_){false};
-  absl::Mutex server_setting_received_mtx_;
-  absl::CondVar server_setting_received_cv_;
+  void changeStreamStateThenNotify(StreamState state);
 
   void onVerifyMessageResult(TelemetryCommand command);
 
@@ -116,6 +156,14 @@ private:
   void applyPublishingConfig(const rmq::Settings& settings, 
std::shared_ptr<Client> client);
 
   void applySubscriptionConfig(const rmq::Settings& settings, 
std::shared_ptr<Client> client);
+
+  /// Start the read stream.
+  ///
+  /// Once got the OnReadDone and status is OK, call StartRead immediately.
+  void fireRead();
+
+  /// Attempt to write pending telemetry command to server.
+  void tryWriteNext() LOCKS_EXCLUDED(state_mtx_, writes_mtx_);
 };
 
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/exports.map b/cpp/source/exports.map
new file mode 100644
index 00000000..99ae5d1d
--- /dev/null
+++ b/cpp/source/exports.map
@@ -0,0 +1,6 @@
+{
+    global:
+        *rocketmq*;
+    local:
+        *;
+};
\ No newline at end of file
diff --git a/cpp/include/rocketmq/SendReceipt.h 
b/cpp/source/rocketmq/FifoContext.cpp
similarity index 70%
copy from cpp/include/rocketmq/SendReceipt.h
copy to cpp/source/rocketmq/FifoContext.cpp
index 489df5ec..1cf5bab2 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/source/rocketmq/FifoContext.cpp
@@ -14,22 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#pragma once
+#include "FifoContext.h"
 
-#include <cstdint>
-#include <string>
-#include <utility>
-
-#include "RocketMQ.h"
+#include "rocketmq/RocketMQ.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-struct SendReceipt {
-  std::string message_id;
-
-  std::string transaction_id;
+FifoContext::FifoContext(MessageConstPtr message, SendCallback callback)
+    : message(std::move(message)), callback(callback) {
+}
 
-  std::string target;
-};
+FifoContext::FifoContext(FifoContext&& rhs) noexcept {
+  this->message = std::move(rhs.message);
+  this->callback = rhs.callback;
+}
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoProducer.cpp 
b/cpp/source/rocketmq/FifoProducer.cpp
new file mode 100644
index 00000000..b52630b0
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducer.cpp
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "rocketmq/FifoProducer.h"
+
+#include <cstddef>
+#include <memory>
+
+#include "FifoProducerImpl.h"
+#include "ProducerImpl.h"
+#include "StaticNameServerResolver.h"
+#include "rocketmq/Configuration.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+FifoProducerBuilder FifoProducer::newBuilder() {
+  return {};
+}
+
+FifoProducerBuilder::FifoProducerBuilder() : 
producer_impl_(std::make_shared<ProducerImpl>()) {
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withConfiguration(Configuration 
configuration) {
+  auto name_server_resolver = 
std::make_shared<StaticNameServerResolver>(configuration.endpoints());
+  producer_impl_->withNameServerResolver(std::move(name_server_resolver));
+  producer_impl_->withCredentialsProvider(configuration.credentialsProvider());
+  producer_impl_->withRequestTimeout(configuration.requestTimeout());
+  producer_impl_->withSsl(configuration.withSsl());
+  return *this;
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withTopics(const 
std::vector<std::string>& topics) {
+  producer_impl_->withTopics(topics);
+  return *this;
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withConcurrency(std::size_t 
concurrency) {
+  this->impl_ = std::make_shared<FifoProducerImpl>(producer_impl_, 
concurrency);
+  return *this;
+}
+
+FifoProducer FifoProducerBuilder::build() {
+  FifoProducer fifo_producer(this->impl_);
+  fifo_producer.start();
+  return fifo_producer;
+}
+
+void FifoProducer::start() {
+  impl_->internalProducer()->start();
+}
+
+void FifoProducer::send(MessageConstPtr message, SendCallback callback) {
+  impl_->send(std::move(message), callback);
+}
+
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/include/rocketmq/SendReceipt.h 
b/cpp/source/rocketmq/FifoProducerImpl.cpp
similarity index 64%
copy from cpp/include/rocketmq/SendReceipt.h
copy to cpp/source/rocketmq/FifoProducerImpl.cpp
index 489df5ec..ad08c4d9 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/source/rocketmq/FifoProducerImpl.cpp
@@ -14,22 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#pragma once
+#include "FifoProducerImpl.h"
 
-#include <cstdint>
-#include <string>
 #include <utility>
 
-#include "RocketMQ.h"
+#include "FifoContext.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-struct SendReceipt {
-  std::string message_id;
+void FifoProducerImpl::send(MessageConstPtr message, SendCallback callback) {
+  auto& group = message->group();
+  std::size_t hash = hash_fn_(group);
+  std::size_t slot = hash % concurrency_;
 
-  std::string transaction_id;
-
-  std::string target;
-};
+  FifoContext context(std::move(message), callback);
+  partitions_[slot]->add(std::move(context));
+}
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp 
b/cpp/source/rocketmq/FifoProducerPartition.cpp
new file mode 100644
index 00000000..8a2f06ff
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducerPartition.cpp
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FifoProducerPartition.h"
+
+#include "absl/synchronization/mutex.h"
+
+#include <atomic>
+#include <memory>
+#include <system_error>
+
+#include "FifoContext.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+#include "rocketmq/SendReceipt.h"
+#include "spdlog/spdlog.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void FifoProducerPartition::add(FifoContext&& context) {
+  {
+    absl::MutexLock lk(&messages_mtx_);
+    messages_.emplace_back(std::move(context));
+    SPDLOG_DEBUG("{} has {} pending messages after #add", name_, 
messages_.size());
+  }
+
+  trySend();
+}
+
+void FifoProducerPartition::trySend() {
+  bool expected = false;
+  if (inflight_.compare_exchange_strong(expected, true, 
std::memory_order_relaxed)) {
+    absl::MutexLock lk(&messages_mtx_);
+
+    if (messages_.empty()) {
+      SPDLOG_DEBUG("There is no more messages to send");
+      return;
+    }
+
+    FifoContext& ctx = messages_.front();
+    MessageConstPtr message = std::move(ctx.message);
+    SendCallback send_callback = ctx.callback;
+
+    std::shared_ptr<FifoProducerPartition> partition = shared_from_this();
+    auto fifo_callback = [=](const std::error_code& ec, const SendReceipt& 
receipt) mutable {
+      partition->onComplete(ec, receipt, send_callback);
+    };
+    SPDLOG_DEBUG("Sending FIFO message from {}", name_);
+    producer_->send(std::move(message), fifo_callback);
+    messages_.pop_front();
+    SPDLOG_DEBUG("In addition to the inflight one, there is {} messages 
pending in {}", messages_.size(), name_);
+  } else {
+    SPDLOG_DEBUG("There is an inflight message");
+  }
+}
+
+void FifoProducerPartition::onComplete(const std::error_code& ec, const 
SendReceipt& receipt, SendCallback& callback) {
+  if (ec) {
+    SPDLOG_INFO("{} completed with a failure: {}", name_, ec.message());
+  } else {
+    SPDLOG_DEBUG("{} completed OK", name_);
+  }
+
+  if (!ec) {
+    callback(ec, receipt);
+    // update inflight status
+    bool expected = true;
+    if (inflight_.compare_exchange_strong(expected, false, 
std::memory_order_relaxed)) {
+      trySend();
+    } else {
+      SPDLOG_ERROR("{}: Unexpected inflight status", name_);
+    }
+    return;
+  }
+
+  // Put the message back to the front of the list
+  SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+  FifoContext retry_context(std::move(receipt_mut.message), callback);
+  {
+    absl::MutexLock lk(&messages_mtx_);
+    messages_.emplace_front(std::move(retry_context));
+  }
+
+  // Update inflight status
+  bool expected = true;
+  if (inflight_.compare_exchange_strong(expected, false, 
std::memory_order_relaxed)) {
+    trySend();
+  } else {
+    SPDLOG_ERROR("Unexpected inflight status");
+  }
+}
+
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 78d812ed..907d0a28 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -21,12 +21,8 @@
 #include <system_error>
 #include <utility>
 
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
-#include "MixAll.h"
 #include "ProducerImpl.h"
 #include "StaticNameServerResolver.h"
-#include "absl/strings/str_split.h"
 #include "rocketmq/ErrorCode.h"
 #include "rocketmq/SendReceipt.h"
 #include "rocketmq/Transaction.h"
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp 
b/cpp/source/rocketmq/ProducerImpl.cpp
index 73130161..34c5b29c 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -17,38 +17,27 @@
 #include "ProducerImpl.h"
 
 #include <algorithm>
-#include <apache/rocketmq/v2/definition.pb.h>
-
 #include <atomic>
 #include <cassert>
 #include <chrono>
-#include <limits>
 #include <memory>
 #include <system_error>
 #include <utility>
 
-#include "Client.h"
-#include "MessageGroupQueueSelector.h"
-#include "MetadataConstants.h"
+#include "apache/rocketmq/v2/definition.pb.h"
 #include "MixAll.h"
 #include "Protocol.h"
 #include "PublishInfoCallback.h"
-#include "RpcClient.h"
 #include "SendContext.h"
-#include "SendMessageContext.h"
 #include "Signature.h"
-#include "Tag.h"
 #include "TracingUtility.h"
 #include "TransactionImpl.h"
-#include "UniqueIdGenerator.h"
 #include "UtilAll.h"
-#include "absl/strings/str_join.h"
 #include "opencensus/trace/propagation/trace_context.h"
 #include "opencensus/trace/span.h"
 #include "rocketmq/ErrorCode.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/SendReceipt.h"
-#include "rocketmq/Tracing.h"
 #include "rocketmq/Transaction.h"
 #include "rocketmq/TransactionChecker.h"
 
@@ -203,19 +192,30 @@ void ProducerImpl::wrapSendMessageRequest(const Message& 
message, SendMessageReq
 SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) 
noexcept {
   ensureRunning(ec);
   if (ec) {
-    return {};
+    SPDLOG_WARN("Producer is not running");
+    SendReceipt send_receipt{};
+    send_receipt.message = std::move(message);
+    return send_receipt;
   }
 
   auto topic_publish_info = getPublishInfo(message->topic());
   if (!topic_publish_info) {
+    SPDLOG_WARN("Route of topic[{}] is not found", message->topic());
     ec = ErrorCode::NotFound;
-    return {};
+    SendReceipt send_receipt{};
+    send_receipt.message = std::move(message);
+    return send_receipt;
   }
 
   std::vector<rmq::MessageQueue> message_queue_list;
-  if 
(!topic_publish_info->selectMessageQueues(absl::make_optional<std::string>(), 
message_queue_list)) {
+  // null_opt_t
+  absl::optional<std::string> message_group{};
+  if (!topic_publish_info->selectMessageQueues(message_group, 
message_queue_list)) {
+    SPDLOG_WARN("Failed to select an addressable message queue for topic[{}]", 
message->topic());
     ec = ErrorCode::NotFound;
-    return {};
+    SendReceipt send_receipt{};
+    send_receipt.message = std::move(message);
+    return send_receipt;
   }
 
   auto mtx = std::make_shared<absl::Mutex>();
@@ -224,9 +224,10 @@ SendReceipt ProducerImpl::send(MessageConstPtr message, 
std::error_code& ec) noe
   SendReceipt   send_receipt;
 
   // Define callback
-  auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& 
receipt) {
+  auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& 
receipt) mutable {
     ec = code;
-    send_receipt = receipt;
+    SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+    send_receipt.message = std::move(receipt_mut.message);
     {
       absl::MutexLock lk(mtx.get());
       completed = true;
@@ -251,6 +252,7 @@ void ProducerImpl::send(MessageConstPtr message, 
SendCallback cb) {
   ensureRunning(ec);
   if (ec) {
     SendReceipt send_receipt;
+    send_receipt.message = std::move(message);
     cb(ec, send_receipt);
   }
 
@@ -264,6 +266,7 @@ void ProducerImpl::send(MessageConstPtr message, 
SendCallback cb) {
     // No route entries of the given topic is available
     if (ec) {
       SendReceipt send_receipt;
+      send_receipt.message = std::move(ptr);
       cb(ec, send_receipt);
       return;
     }
@@ -271,6 +274,7 @@ void ProducerImpl::send(MessageConstPtr message, 
SendCallback cb) {
     if (!publish_info) {
       std::error_code ec = ErrorCode::NotFound;
       SendReceipt     send_receipt;
+      send_receipt.message = std::move(ptr);
       cb(ec, send_receipt);
       return;
     }
@@ -280,6 +284,7 @@ void ProducerImpl::send(MessageConstPtr message, 
SendCallback cb) {
     if (!publish_info->selectMessageQueues(ptr->group(), message_queue_list)) {
       std::error_code ec = ErrorCode::NotFound;
       SendReceipt     send_receipt;
+      send_receipt.message = std::move(ptr);
       cb(ec, send_receipt);
       return;
     }
@@ -338,12 +343,12 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> 
context) {
   Metadata metadata;
   Signature::sign(client_config_, metadata);
 
-  auto callback = [context](const std::error_code& ec, const SendReceipt& 
send_receipt) {
-    if (ec) {
-      context->onFailure(ec);
+  auto callback = [context](const SendResult& send_result) {
+    if (send_result.ec) {
+      context->onFailure(send_result.ec);
       return;
     }
-    context->onSuccess(send_receipt);
+    context->onSuccess(send_result);
   };
 
   client_manager_->send(target, metadata, request, callback);
@@ -354,12 +359,14 @@ void ProducerImpl::send0(MessageConstPtr message, 
SendCallback callback, std::ve
   std::error_code ec;
   validate(*message, ec);
   if (ec) {
+    send_receipt.message = std::move(message);
     callback(ec, send_receipt);
     return;
   }
 
   if (list.empty()) {
     ec = ErrorCode::NotFound;
+    send_receipt.message = std::move(message);
     callback(ec, send_receipt);
     return;
   }
diff --git a/cpp/source/rocketmq/SendContext.cpp 
b/cpp/source/rocketmq/SendContext.cpp
index 385a1a99..bd97384d 100644
--- a/cpp/source/rocketmq/SendContext.cpp
+++ b/cpp/source/rocketmq/SendContext.cpp
@@ -21,41 +21,44 @@
 #include "ProducerImpl.h"
 #include "PublishStats.h"
 #include "Tag.h"
-#include "TransactionImpl.h"
-#include "opencensus/trace/propagation/trace_context.h"
 #include "opencensus/trace/span.h"
-#include "rocketmq/Logger.h"
+#include "rocketmq/ErrorCode.h"
 #include "rocketmq/SendReceipt.h"
 #include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-void SendContext::onSuccess(const SendReceipt& send_receipt) noexcept {
+void SendContext::onSuccess(const SendResult& send_result) noexcept {
   {
     // Mark end of send-message span.
     span_.SetStatus(opencensus::trace::StatusCode::OK);
     span_.End();
   }
 
-  auto publisher = producer_.lock();
-  if (!publisher) {
+  auto producer = producer_.lock();
+  if (!producer) {
+    SPDLOG_WARN("Producer has been destructed");
     return;
   }
 
   // Collect metrics
   {
     auto duration = std::chrono::steady_clock::now() - request_time_;
-    opencensus::stats::Record({{publisher->stats().latency(), 
MixAll::millisecondsOf(duration)}},
+    opencensus::stats::Record({{producer->stats().latency(), 
MixAll::millisecondsOf(duration)}},
                               {
                                   {Tag::topicTag(), message_->topic()},
-                                  {Tag::clientIdTag(), 
publisher->config().client_id},
+                                  {Tag::clientIdTag(), 
producer->config().client_id},
                                   {Tag::invocationStatusTag(), "success"},
                               });
   }
 
   // 
send_receipt.traceContext(opencensus::trace::propagation::ToTraceParentHeader(span_.context()));
-  std::error_code ec;
-  callback_(ec, send_receipt);
+  SendReceipt send_receipt = {};
+  send_receipt.target = send_result.target;
+  send_receipt.message_id = send_result.message_id;
+  send_receipt.transaction_id = send_result.transaction_id;
+  send_receipt.message = std::move(message_);
+  callback_(send_result.ec, send_receipt);
 }
 
 void SendContext::onFailure(const std::error_code& ec) noexcept {
@@ -65,38 +68,36 @@ void SendContext::onFailure(const std::error_code& ec) 
noexcept {
     span_.End();
   }
 
-  auto publisher = producer_.lock();
-  if (!publisher) {
+  auto producer = producer_.lock();
+  if (!producer) {
+    SPDLOG_WARN("Producer has been destructed");
     return;
   }
 
   // Collect metrics
   {
     auto duration = std::chrono::steady_clock::now() - request_time_;
-    opencensus::stats::Record({{publisher->stats().latency(), 
MixAll::millisecondsOf(duration)}},
+    opencensus::stats::Record({{producer->stats().latency(), 
MixAll::millisecondsOf(duration)}},
                               {
                                   {Tag::topicTag(), message_->topic()},
-                                  {Tag::clientIdTag(), 
publisher->config().client_id},
+                                  {Tag::clientIdTag(), 
producer->config().client_id},
                                   {Tag::invocationStatusTag(), "failure"},
                               });
   }
 
-  if (++attempt_times_ >= publisher->maxAttemptTimes()) {
-    SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", 
attempt_times_, publisher->maxAttemptTimes());
-    callback_(ec, {});
-    return;
-  }
-
-  std::shared_ptr<ProducerImpl> producer = producer_.lock();
-  if (!producer) {
-    SPDLOG_WARN("Producer has been destructed");
-    callback_(ec, {});
+  if (++attempt_times_ >= producer->maxAttemptTimes()) {
+    SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", 
attempt_times_, producer->maxAttemptTimes());
+    SendReceipt receipt{};
+    receipt.message = std::move(message_);
+    callback_(ec, receipt);
     return;
   }
 
   if (candidates_.empty()) {
     SPDLOG_WARN("No alternative hosts to perform additional retries");
-    callback_(ec, {});
+    SendReceipt receipt{};
+    receipt.message = std::move(message_);
+    callback_(ec, receipt);
     return;
   }
 
@@ -106,7 +107,7 @@ void SendContext::onFailure(const std::error_code& ec) 
noexcept {
   auto ctx = shared_from_this();
   // If publish message requests are throttled, retry after backoff
   if (ErrorCode::TooManyRequests == ec) {
-    auto&& backoff = publisher->backoff(attempt_times_);
+    auto&& backoff = producer->backoff(attempt_times_);
     SPDLOG_DEBUG("Publish message[topic={}, message-id={}] is throttled. Retry 
after {}ms", message_->topic(),
                  message_->id(), MixAll::millisecondsOf(backoff));
     auto retry_cb = [=]() { producer->sendImpl(ctx); };
diff --git a/cpp/source/rocketmq/include/ClientImpl.h 
b/cpp/source/rocketmq/include/ClientImpl.h
index c266047a..d7693962 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -94,8 +94,8 @@ public:
     client_config_.request_timeout = absl::FromChrono(request_timeout);
   }
 
-  void withSsl(bool enable) {
-    client_config_.withSsl = enable;
+  void withSsl(bool with_ssl) {
+    client_config_.withSsl = with_ssl;
   }
 
   /**
diff --git a/cpp/include/rocketmq/SendReceipt.h 
b/cpp/source/rocketmq/include/FifoContext.h
similarity index 74%
copy from cpp/include/rocketmq/SendReceipt.h
copy to cpp/source/rocketmq/include/FifoContext.h
index 489df5ec..2fc3492e 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/source/rocketmq/include/FifoContext.h
@@ -16,20 +16,19 @@
  */
 #pragma once
 
-#include <cstdint>
-#include <string>
-#include <utility>
-
-#include "RocketMQ.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-struct SendReceipt {
-  std::string message_id;
+struct FifoContext {
+  MessageConstPtr message;
+  SendCallback callback;
 
-  std::string transaction_id;
+  FifoContext(MessageConstPtr message, SendCallback callback);
 
-  std::string target;
+  FifoContext(FifoContext&& rhs) noexcept;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h 
b/cpp/source/rocketmq/include/FifoProducerImpl.h
new file mode 100644
index 00000000..ffc9c8a6
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoProducerImpl.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "FifoProducerPartition.h"
+#include "ProducerImpl.h"
+#include "fmt/format.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerImpl : std::enable_shared_from_this<FifoProducerImpl> {
+public:
+  FifoProducerImpl(std::shared_ptr<ProducerImpl> producer, std::size_t 
concurrency)
+      : producer_(producer), concurrency_(concurrency), 
partitions_(concurrency) {
+    for (auto i = 0; i < concurrency; i++) {
+      partitions_[i] = std::make_shared<FifoProducerPartition>(producer_, 
fmt::format("slot-{}", i));
+    }
+  };
+
+  void send(MessageConstPtr message, SendCallback callback);
+
+  std::shared_ptr<ProducerImpl>& internalProducer() {
+    return producer_;
+  }
+
+private:
+  std::shared_ptr<ProducerImpl> producer_;
+  std::vector<std::shared_ptr<FifoProducerPartition>> partitions_;
+  std::size_t concurrency_;
+  std::hash<std::string> hash_fn_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerPartition.h 
b/cpp/source/rocketmq/include/FifoProducerPartition.h
new file mode 100644
index 00000000..8d0e00d8
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoProducerPartition.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "absl/base/internal/thread_annotations.h"
+
+#include <atomic>
+#include <list>
+#include <memory>
+#include <system_error>
+
+#include "FifoContext.h"
+#include "ProducerImpl.h"
+#include "absl/base/thread_annotations.h"
+#include "absl/synchronization/mutex.h"
+#include "rocketmq/SendCallback.h"
+#include "rocketmq/SendReceipt.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerPartition : public 
std::enable_shared_from_this<FifoProducerPartition> {
+public:
+  FifoProducerPartition(std::shared_ptr<ProducerImpl> producer, std::string&& 
name)
+      : producer_(producer), name_(std::move(name)) {
+  }
+
+  void add(FifoContext&& context) LOCKS_EXCLUDED(messages_mtx_);
+
+  void trySend() LOCKS_EXCLUDED(messages_mtx_);
+
+  void onComplete(const std::error_code& ec, const SendReceipt& receipt, 
SendCallback& callback);
+
+private:
+  std::shared_ptr<ProducerImpl> producer_;
+  std::list<FifoContext> messages_ GUARDED_BY(messages_mtx_);
+  absl::Mutex messages_mtx_;
+  std::atomic_bool inflight_{false};
+  std::string name_;
+};
+
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h 
b/cpp/source/rocketmq/include/ProducerImpl.h
index d7260a93..b572f20d 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -16,28 +16,23 @@
  */
 #pragma once
 
-#include <chrono>
 #include <memory>
-#include <mutex>
 #include <string>
 #include <system_error>
 
 #include "ClientImpl.h"
-#include "ClientManagerImpl.h"
 #include "MixAll.h"
 #include "PublishInfoCallback.h"
+#include "PublishStats.h"
 #include "SendContext.h"
 #include "TopicPublishInfo.h"
 #include "TransactionImpl.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/container/flat_hash_set.h"
-#include "absl/strings/string_view.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/SendCallback.h"
 #include "rocketmq/SendReceipt.h"
-#include "rocketmq/State.h"
 #include "rocketmq/TransactionChecker.h"
-#include "PublishStats.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -53,8 +48,22 @@ public:
 
   void shutdown() override;
 
+  /**
+   * Note we requrie application to transfer ownership of the message to send 
to avoid concurrent modification during
+   * sent.
+   *
+   * Regardless of the send result, SendReceipt would have the 
std::unique_ptr<const Message>, facilliating
+   * application to conduct customized retry policy.
+   */
   SendReceipt send(MessageConstPtr message, std::error_code& ec) noexcept;
 
+  /**
+   * Note we requrie application to transfer ownership of the message to send 
to avoid concurrent modification during
+   * sent.
+   *
+   * Regardless of the send result, SendReceipt would have the 
std::unique_ptr<const Message>, facilliating
+   * application to conduct customized retry policy.
+   */
   void send(MessageConstPtr message, SendCallback callback);
 
   void setTransactionChecker(TransactionChecker checker);
@@ -64,6 +73,13 @@ public:
     return absl::make_unique<TransactionImpl>(producer);
   }
 
+  /**
+   * Note we requrie application to transfer ownership of the message to send 
to avoid concurrent modification during
+   * sent.
+   *
+   * TODO: Refine this API. Current API is not good enough as it cannot handle 
the message back to its caller on publish
+   * failure.
+   */
   void send(MessageConstPtr message, std::error_code& ec, Transaction& 
transaction);
 
   /**
diff --git a/cpp/source/rocketmq/include/SendContext.h 
b/cpp/source/rocketmq/include/SendContext.h
index 4c05cebe..4067532b 100644
--- a/cpp/source/rocketmq/include/SendContext.h
+++ b/cpp/source/rocketmq/include/SendContext.h
@@ -19,16 +19,12 @@
 #include <memory>
 #include <system_error>
 
-#include "absl/container/flat_hash_map.h"
-#include "absl/synchronization/mutex.h"
-#include "opencensus/trace/span.h"
-
 #include "Protocol.h"
+#include "SendResult.h"
 #include "TransactionImpl.h"
-#include "rocketmq/ErrorCode.h"
+#include "opencensus/trace/span.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/SendCallback.h"
-#include "rocketmq/SendReceipt.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -47,7 +43,7 @@ public:
         span_(opencensus::trace::Span::BlankSpan()) {
   }
 
-  void onSuccess(const SendReceipt& send_receipt) noexcept;
+  void onSuccess(const SendResult& send_result) noexcept;
 
   void onFailure(const std::error_code& ec) noexcept;
 
diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h 
b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
index 45aa61b9..7fc63b6d 100644
--- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
@@ -25,7 +25,7 @@
 using namespace std::chrono;
 ROCKETMQ_NAMESPACE_BEGIN
 
-class SimpleConsumerImpl : public ClientImpl, public 
std::enable_shared_from_this<SimpleConsumerImpl> {
+class SimpleConsumerImpl : virtual public ClientImpl, public 
std::enable_shared_from_this<SimpleConsumerImpl> {
 public:
   SimpleConsumerImpl(std::string group);
 
diff --git a/cpp/source/rocketmq/tests/BUILD.bazel 
b/cpp/source/rocketmq/tests/BUILD.bazel
index a8d10e92..74751c35 100644
--- a/cpp/source/rocketmq/tests/BUILD.bazel
+++ b/cpp/source/rocketmq/tests/BUILD.bazel
@@ -59,4 +59,12 @@ cc_test(
         "ConsumeMessageServiceTest.cpp",
     ],
     deps = base_deps,
+)
+
+cc_test(
+    name = "optional_test",
+    srcs = [
+        "OptionalTest.cpp",
+    ],
+    deps = base_deps
 )
\ No newline at end of file
diff --git a/cpp/include/rocketmq/SendReceipt.h 
b/cpp/source/rocketmq/tests/OptionalTest.cpp
similarity index 72%
copy from cpp/include/rocketmq/SendReceipt.h
copy to cpp/source/rocketmq/tests/OptionalTest.cpp
index 489df5ec..1266a73e 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/source/rocketmq/tests/OptionalTest.cpp
@@ -14,22 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#pragma once
+#include "gtest/gtest.h"
+#include "absl/types/optional.h"
+#include "rocketmq/RocketMQ.h"
 
-#include <cstdint>
 #include <string>
-#include <utility>
-
-#include "RocketMQ.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-struct SendReceipt {
-  std::string message_id;
+TEST(OptionalTest, test_optional) {
+    absl::optional<std::string> opt{};
+    ASSERT_EQ(false, opt.has_value());
 
-  std::string transaction_id;
+    auto opt2 = absl::make_optional<std::string>();
+    ASSERT_EQ(true, opt2.has_value());
+}
 
-  std::string target;
-};
+ROCKETMQ_NAMESPACE_END
 
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/stats/MetricBidiReactor.cpp 
b/cpp/source/stats/MetricBidiReactor.cpp
index 22363bd0..e6921378 100644
--- a/cpp/source/stats/MetricBidiReactor.cpp
+++ b/cpp/source/stats/MetricBidiReactor.cpp
@@ -18,10 +18,10 @@
 
 #include <chrono>
 
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
 #include "OpencensusExporter.h"
 #include "Signature.h"
+#include "rocketmq/Logger.h"
+#include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -43,12 +43,15 @@ MetricBidiReactor::MetricBidiReactor(std::weak_ptr<Client> 
client, std::weak_ptr
     return;
   }
   exporter_ptr->stub()->async()->Export(&context_, this);
+  AddHold();
   StartCall();
 }
 
 void MetricBidiReactor::OnReadDone(bool ok) {
   if (!ok) {
     SPDLOG_WARN("Failed to read response");
+    // match the AddHold() call in MetricBidiReactor::fireRead
+    RemoveHold();
     return;
   }
   SPDLOG_DEBUG("OnReadDone OK");
@@ -56,16 +59,32 @@ void MetricBidiReactor::OnReadDone(bool ok) {
 }
 
 void MetricBidiReactor::OnWriteDone(bool ok) {
+  {
+    bool expected = true;
+    if (!inflight_.compare_exchange_strong(expected, false, 
std::memory_order_relaxed)) {
+      SPDLOG_WARN("Illegal command-inflight state");
+      return;
+    }
+  }
+
   if (!ok) {
     SPDLOG_WARN("Failed to report metrics");
+    // match AddHold() call in MetricBidiReactor::MetricBidiReactor
+    RemoveHold();
     return;
   }
   SPDLOG_DEBUG("OnWriteDone OK");
+
+  // If the read stream has not started yet, start it now.
   fireRead();
-  bool expected = true;
-  if (inflight_.compare_exchange_strong(expected, false, 
std::memory_order_relaxed)) {
-    fireWrite();
+
+  // Remove the one that been written.
+  {
+    absl::MutexLock lk(&requests_mtx_);
+    requests_.pop_front();
   }
+
+  tryWriteNext();
 }
 
 void MetricBidiReactor::OnDone(const grpc::Status& s) {
@@ -89,13 +108,13 @@ void MetricBidiReactor::write(ExportMetricsServiceRequest 
request) {
   SPDLOG_DEBUG("Append ExportMetricsServiceRequest to buffer");
   {
     absl::MutexLock lk(&requests_mtx_);
-    requests_.emplace_back(std::move(request));
+    requests_.push_back(std::move(request));
   }
 
-  fireWrite();
+  tryWriteNext();
 }
 
-void MetricBidiReactor::fireWrite() {
+void MetricBidiReactor::tryWriteNext() {
   {
     absl::MutexLock lk(&requests_mtx_);
     if (requests_.empty()) {
@@ -107,16 +126,15 @@ void MetricBidiReactor::fireWrite() {
   bool expected = false;
   if (inflight_.compare_exchange_strong(expected, true, 
std::memory_order_relaxed)) {
     absl::MutexLock lk(&requests_mtx_);
-    request_ = std::move(*requests_.begin());
-    requests_.erase(requests_.begin());
     SPDLOG_DEBUG("MetricBidiReactor#StartWrite");
-    StartWrite(&request_);
+    StartWrite(&(requests_.front()));
   }
 }
 
 void MetricBidiReactor::fireRead() {
   bool expected = false;
   if (read_.compare_exchange_strong(expected, true, 
std::memory_order_relaxed)) {
+    AddHold();
     StartRead(&response_);
   }
 }
diff --git a/cpp/source/stats/include/MetricBidiReactor.h 
b/cpp/source/stats/include/MetricBidiReactor.h
index 0a10cd88..e4d75344 100644
--- a/cpp/source/stats/include/MetricBidiReactor.h
+++ b/cpp/source/stats/include/MetricBidiReactor.h
@@ -16,6 +16,8 @@
  */
 #pragma once
 
+#include <list>
+
 #include "Client.h"
 #include "grpcpp/grpcpp.h"
 #include "grpcpp/impl/codegen/client_callback.h"
@@ -25,12 +27,17 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 class OpencensusExporter;
 
-using ExportMetricsServiceRequest = 
opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
-using ExportMetricsServiceResponse = 
opencensus::proto::agent::metrics::v1::ExportMetricsServiceResponse;
+using ExportMetricsServiceRequest =
+    opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
+using ExportMetricsServiceResponse =
+    opencensus::proto::agent::metrics::v1::ExportMetricsServiceResponse;
 
-class MetricBidiReactor : public 
grpc::ClientBidiReactor<ExportMetricsServiceRequest, 
ExportMetricsServiceResponse> {
+class MetricBidiReactor
+    : public grpc::ClientBidiReactor<ExportMetricsServiceRequest,
+                                     ExportMetricsServiceResponse> {
 public:
-  MetricBidiReactor(std::weak_ptr<Client> client, 
std::weak_ptr<OpencensusExporter> exporter);
+  MetricBidiReactor(std::weak_ptr<Client> client,
+                    std::weak_ptr<OpencensusExporter> exporter);
 
   /// Notifies the application that a StartRead operation completed.
   ///
@@ -52,7 +59,7 @@ public:
   /// (like failure to remove a hold).
   ///
   /// \param[in] s The status outcome of this RPC
-  void OnDone(const grpc::Status& /*s*/) override;
+  void OnDone(const grpc::Status & /*s*/) override;
 
   void write(ExportMetricsServiceRequest request) 
LOCKS_EXCLUDED(requests_mtx_);
 
@@ -61,9 +68,8 @@ private:
   std::weak_ptr<OpencensusExporter> exporter_;
   grpc::ClientContext context_;
 
-  ExportMetricsServiceRequest request_;
-
-  std::vector<ExportMetricsServiceRequest> requests_ GUARDED_BY(requests_mtx_);
+  /// Pending ExportMetricsServiceRequest items to write to server
+  std::list<ExportMetricsServiceRequest> requests_ GUARDED_BY(requests_mtx_);
   absl::Mutex requests_mtx_;
 
   std::atomic_bool inflight_{false};
@@ -71,7 +77,7 @@ private:
 
   ExportMetricsServiceResponse response_;
 
-  void fireWrite();
+  void tryWriteNext();
 
   void fireRead();
 };
diff --git a/cpp/tools/trouble_shooting.sh b/cpp/tools/gen_compile_commands.sh
old mode 100644
new mode 100755
similarity index 83%
copy from cpp/tools/trouble_shooting.sh
copy to cpp/tools/gen_compile_commands.sh
index 06d3795f..5ffc239c
--- a/cpp/tools/trouble_shooting.sh
+++ b/cpp/tools/gen_compile_commands.sh
@@ -1,3 +1,4 @@
+#!/usr/bin/env bash
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -12,6 +13,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+TOOLS_DIR=$(dirname "$0")
+WORKSPACE_DIR=$(dirname "$TOOLS_DIR")
+cd $WORKSPACE_DIR
 
-export GRPC_VERBOSITY=debug 
-export GRPC_TRACE=tcp,http,api
\ No newline at end of file
+bazel run @hedron_compile_commands//:refresh_all
\ No newline at end of file
diff --git a/cpp/tools/trouble_shooting.sh b/cpp/tools/trouble_shooting.sh
old mode 100644
new mode 100755

Reply via email to