This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 156784b5 Develop cpp (#762) 156784b5 is described below commit 156784b540328076c4c43fee242306ac94977dbb Author: Zhanhui Li <lizhan...@gmail.com> AuthorDate: Wed Jun 26 19:07:14 2024 +0800 Develop cpp (#762) * feat: add tagged-release github action pipeline Signed-off-by: Zhanhui Li <lizhan...@gmail.com> * Fix C++ SDK core dump issue (#2) * fix: sync namespace from server Settings * feat: use opentelemetry for tracing/metrics/logging * Remove broken links and add targets to generate compile_commands.json Signed-off-by: lizhanhui <lizhan...@gmail.com> * fix: timer task may invoke a call to a destructing stream Signed-off-by: Zhanhui Li <lizhan...@gmail.com> * fix: update document as we have changed the way to generate compile_commands.json * fix: static_cast StreamState to std::uint8_t as enum class by default is not formattable Signed-off-by: Li Zhanhui <lizhan...@gmail.com> --------- Signed-off-by: lizhanhui <lizhan...@gmail.com> Signed-off-by: Zhanhui Li <lizhan...@gmail.com> Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: fix stream state transition with gRPC reactor Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: revamp TelemetryBidiRecator Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * feat: explicitly control exported symbols Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: release write hold when OnReadDone with ok=false and there is no inflight write Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * feat: revamp TelemetryBidiReactor states and their transition graph Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: example publish message QPS stats lambda * Fifo opt (#732) * Prepare to optimize FIFO publishing Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: SendReceipt now contains std::unique_ptr<Message> being sent Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: add doc explaining why we taking ownership of the message being sent Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * feat: implement FifoProducerPartition Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * feat: implement FifoProducerImpl Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * feat: implement builder for FifoProducer Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: prepare to debug Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: log sending sending stages Signed-off-by: Li Zhanhui <lizhan...@gmail.com> --------- Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: spell errors Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: markdown code blocks should be fenced by blank lines Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: copyright header Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: copyright header Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: include statements Signed-off-by: Zhanhui Li <lizhan...@gmail.com> * feat: build example_fifo_producer in BUILD.bazel Signed-off-by: Zhanhui Li <lizhan...@gmail.com> * fix: #713 correct misuse of absl::make_optional Signed-off-by: Zhanhui Li <lizhan...@gmail.com> --------- Signed-off-by: Zhanhui Li <lizhan...@gmail.com> Signed-off-by: lizhanhui <lizhan...@gmail.com> Signed-off-by: Li Zhanhui <lizhan...@gmail.com> --- .github/workflows/cpp_tagged_release.yaml | 57 ++++ .licenserc.yaml | 1 + cpp/CMakeLists.txt | 4 +- cpp/README.md | 23 +- cpp/WORKSPACE | 26 +- cpp/bazel/rocketmq_deps.bzl | 25 +- cpp/examples/BUILD.bazel | 11 + cpp/examples/CMakeLists.txt | 1 + ...oducerWithAsync.cpp => ExampleFifoProducer.cpp} | 71 ++-- cpp/examples/ExampleProducer.cpp | 7 +- cpp/examples/ExampleProducerWithAsync.cpp | 8 +- cpp/examples/ExampleProducerWithFifoMessage.cpp | 10 +- cpp/examples/ExampleProducerWithTimedMessage.cpp | 7 +- .../ExampleProducerWithTransactionalMessage.cpp | 7 +- cpp/examples/ExamplePushConsumer.cpp | 4 +- cpp/examples/ExampleSimpleConsumer.cpp | 4 +- cpp/include/rocketmq/Configuration.h | 6 +- cpp/include/rocketmq/FifoProducer.h | 68 ++++ cpp/include/rocketmq/Producer.h | 5 +- cpp/include/rocketmq/SendReceipt.h | 7 +- cpp/source/CMakeLists.txt | 3 + cpp/source/base/Configuration.cpp | 4 +- cpp/source/client/ClientManagerImpl.cpp | 152 +++++---- cpp/source/client/SessionImpl.cpp | 6 +- cpp/source/client/TelemetryBidiReactor.cpp | 372 ++++++++++++++++----- cpp/source/client/include/ClientManager.h | 10 +- cpp/source/client/include/ClientManagerImpl.h | 14 +- .../client/include/SendResult.h} | 14 +- .../client/include/SendResultCallback.h} | 14 +- cpp/source/client/include/TelemetryBidiReactor.h | 110 ++++-- cpp/source/exports.map | 6 + .../rocketmq/FifoContext.cpp} | 21 +- cpp/source/rocketmq/FifoProducer.cpp | 73 ++++ .../rocketmq/FifoProducerImpl.cpp} | 22 +- cpp/source/rocketmq/FifoProducerPartition.cpp | 107 ++++++ cpp/source/rocketmq/Producer.cpp | 4 - cpp/source/rocketmq/ProducerImpl.cpp | 51 +-- cpp/source/rocketmq/SendContext.cpp | 53 +-- cpp/source/rocketmq/include/ClientImpl.h | 4 +- .../rocketmq/include/FifoContext.h} | 17 +- cpp/source/rocketmq/include/FifoProducerImpl.h | 53 +++ .../rocketmq/include/FifoProducerPartition.h | 55 +++ cpp/source/rocketmq/include/ProducerImpl.h | 28 +- cpp/source/rocketmq/include/SendContext.h | 10 +- cpp/source/rocketmq/include/SimpleConsumerImpl.h | 2 +- cpp/source/rocketmq/tests/BUILD.bazel | 8 + .../rocketmq/tests/OptionalTest.cpp} | 21 +- cpp/source/stats/MetricBidiReactor.cpp | 40 ++- cpp/source/stats/include/MetricBidiReactor.h | 24 +- ...trouble_shooting.sh => gen_compile_commands.sh} | 7 +- cpp/tools/trouble_shooting.sh | 0 51 files changed, 1221 insertions(+), 436 deletions(-) diff --git a/.github/workflows/cpp_tagged_release.yaml b/.github/workflows/cpp_tagged_release.yaml new file mode 100644 index 00000000..416676c6 --- /dev/null +++ b/.github/workflows/cpp_tagged_release.yaml @@ -0,0 +1,57 @@ +--- +name: "cpp-tagged-release" + +on: + push: + tags: + - "cpp-*" + +jobs: + tagged-release: + name: "C++ Tagged Release" + runs-on: "ubuntu-latest" + + steps: + - uses: actions/checkout@v4 + - uses: actions/checkout@v4 + with: + repository: grpc/grpc + # The branch, tag or SHA to checkout. When checking out the repository that + # triggered a workflow, this defaults to the reference or SHA for that event. + # Otherwise, uses the default branch. + ref: 'v1.46.3' + # Relative path under $GITHUB_WORKSPACE to place the repository + path: cpp/repo/grpc + submodules: true + - name: "Install Dependencies" + run: | + sudo apt-get install -y build-essential autoconf libtool pkg-config cmake git libprotobuf-dev libssl-dev zlib1g-dev libgflags-dev + - name: "Build gRPC" + working-directory: ./cpp/repo/grpc + run: | + mkdir _build && cd _build + cmake -DCMAKE_INSTALL_PREFIX=$HOME/grpc -DgRPC_SSL_PROVIDER=package -DgRPC_ZLIB_PROVIDER=package -DgRPC_PROTOBUF_PACKAGE_TYPE=CONFIG -DgRPC_ZLIB_PROVIDER=package .. + make + make install + - name: "Build Libraries" + working-directory: ./cpp + run: | + mkdir _build && cd _build + cmake .. + make + - name: "Package" + working-directory: ./cpp + run: | + mkdir -p dist/lib + mkdir -p dist/include + cp -r include/rocketmq dist/include/ + cp _build/librocketmq.so dist/lib/ + cp _build/librocketmq.a dist/lib/ + tar -czvf dist.tar.gz dist + - uses: "marvinpinto/action-automatic-releases@latest" + with: + repo_token: "${{ secrets.GITHUB_TOKEN }}" + prerelease: false + automatic_release_tag: cpp + files: | + cpp/dist.tar.gz \ No newline at end of file diff --git a/.licenserc.yaml b/.licenserc.yaml index 4edcd18f..cb5c8747 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -48,6 +48,7 @@ header: - 'cpp/.gitignore' - 'cpp/third_party' - 'cpp/cmake' + - 'cpp/source/exports.map' - 'php/grpc/**/*.php' - 'php/composer.json' - 'rust/.cargo/Cargo.lock.min' diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 374466e6..42f7cd70 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.19) +cmake_minimum_required(VERSION 3.16) project(rocketmq) set(CMAKE_CXX_STANDARD 11) set(CMAKE_POSITION_INDEPENDENT_CODE ON) @@ -31,4 +31,4 @@ if (BUILD_EXAMPLES) find_package(gflags REQUIRED) find_package(ZLIB REQUIRED) add_subdirectory(examples) -endif () \ No newline at end of file +endif () diff --git a/cpp/README.md b/cpp/README.md index 117a2426..a6d983fb 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -217,19 +217,13 @@ if "com_google_googletest" not in native.existing_rules(): 1. VSCode + Clangd [Clangd](https://clangd.llvm.org/) is a really nice code completion tool. Clangd requires compile_commands.json to work properly. - To generate the file, we need clone another repository along with the current one. + To generate the file, run the following command: ```sh - git clone g...@github.com:grailbio/bazel-compilation-database.git + ./tools/gen_compile_commands.sh ``` - From current repository root, - - ```sh - ../bazel-compilation-database/generate.sh - ``` - - Once the script completes, you should have compile_commands.json file in the repository root directory. + Once the script completes, you should have compile_commands.json file in the workspace directory, aka, ${repository}/cpp. LLVM project has an extension for [clangd](https://marketplace.visualstudio.com/items?itemName=llvm-vs-code-extensions.vscode-clangd). Please install it from the extension market. @@ -239,8 +233,15 @@ if "com_google_googletest" not in native.existing_rules(): "C_Cpp.intelliSenseEngine": "Disabled", "C_Cpp.autocomplete": "Disabled", // So you don't get autocomplete from both extensions. "C_Cpp.errorSquiggles": "Disabled", // So you don't get error squiggles from both extensions (clangd's seem to be more reliable anyway). - "clangd.path": "/Users/lizhanhui/usr/clangd_12.0.0/bin/clangd", - "clangd.arguments": ["-log=verbose", "-pretty", "--background-index"], + "clangd.path": "/usr/bin/clangd", + "clangd.arguments": [ + "-log=verbose", + "-pretty", + "--background-index", + "--header-insertion=never", + "--compile-commands-dir=${workspaceFolder}/", + "--query-driver=**" + ], "clangd.onConfigChanged": "restart", ``` diff --git a/cpp/WORKSPACE b/cpp/WORKSPACE index d09dd445..3c3d6476 100644 --- a/cpp/WORKSPACE +++ b/cpp/WORKSPACE @@ -27,4 +27,28 @@ http_archive( load("@io_buildbuddy_buildbuddy_toolchain//:deps.bzl", "buildbuddy_deps") buildbuddy_deps() load("@io_buildbuddy_buildbuddy_toolchain//:rules.bzl", "buildbuddy") -buildbuddy(name = "buildbuddy_toolchain") \ No newline at end of file +buildbuddy(name = "buildbuddy_toolchain") + +# Generate compile_commands.json +load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") + + +# Hedron's Compile Commands Extractor for Bazel +# https://github.com/hedronvision/bazel-compile-commands-extractor +http_archive( + name = "hedron_compile_commands", + + # Replace the commit hash (0e990032f3c5a866e72615cf67e5ce22186dcb97) in both places (below) with the latest (https://github.com/hedronvision/bazel-compile-commands-extractor/commits/main), rather than using the stale one here. + # Even better, set up Renovate and let it do the work for you (see "Suggestion: Updates" in the README). + url = "https://github.com/hedronvision/bazel-compile-commands-extractor/archive/204aa593e002cbd177d30f11f54cff3559110bb9.tar.gz", + strip_prefix = "bazel-compile-commands-extractor-204aa593e002cbd177d30f11f54cff3559110bb9", + # When you first run this tool, it'll recommend a sha256 hash to put here with a message like: "DEBUG: Rule 'hedron_compile_commands' indicated that a canonical reproducible form can be obtained by modifying arguments sha256 = ..." +) +load("@hedron_compile_commands//:workspace_setup.bzl", "hedron_compile_commands_setup") +hedron_compile_commands_setup() +load("@hedron_compile_commands//:workspace_setup_transitive.bzl", "hedron_compile_commands_setup_transitive") +hedron_compile_commands_setup_transitive() +load("@hedron_compile_commands//:workspace_setup_transitive_transitive.bzl", "hedron_compile_commands_setup_transitive_transitive") +hedron_compile_commands_setup_transitive_transitive() +load("@hedron_compile_commands//:workspace_setup_transitive_transitive_transitive.bzl", "hedron_compile_commands_setup_transitive_transitive_transitive") +hedron_compile_commands_setup_transitive_transitive_transitive() \ No newline at end of file diff --git a/cpp/bazel/rocketmq_deps.bzl b/cpp/bazel/rocketmq_deps.bzl index eae31a6f..684e55eb 100644 --- a/cpp/bazel/rocketmq_deps.bzl +++ b/cpp/bazel/rocketmq_deps.bzl @@ -16,7 +16,6 @@ def rocketmq_deps(): sha256 = "b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5", strip_prefix = "googletest-release-1.11.0", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz", "https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz", ], ) @@ -27,7 +26,6 @@ def rocketmq_deps(): strip_prefix = "filesystem-1.5.0", sha256 = "eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz", "https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz", ], build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD", @@ -39,7 +37,6 @@ def rocketmq_deps(): strip_prefix = "spdlog-1.9.2", sha256 = "6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz", "https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz", ], build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD", @@ -51,7 +48,6 @@ def rocketmq_deps(): strip_prefix = "fmt-8.0.1", sha256 = "b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz", "https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz", ], build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD", @@ -63,7 +59,6 @@ def rocketmq_deps(): sha256 = "8b28fdd45bab62d15db232ec404248901842e5340299a57765e48abe8a80d930", strip_prefix = "protobuf-3.20.1", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/protobuf/protobuf-3.20.1.tar.gz", "https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.20.1.tar.gz", ], ) @@ -74,7 +69,6 @@ def rocketmq_deps(): sha256 = "507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731", strip_prefix = "rules_proto_grpc-4.1.1", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto_grpc/rules_proto_grpc-4.1.1.tar.gz", "https://github.com/rules-proto-grpc/rules_proto_grpc/archive/refs/tags/4.1.1.tar.gz", ], ) @@ -84,7 +78,6 @@ def rocketmq_deps(): name = "io_opencensus_cpp", sha256 = "317f2bfdaba469561c7e64b1a55282b87e677c109c9d8877097940e6d5cbca08", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/opencensus-cpp/opencensus-cpp-0.4.1.tar.gz", "https://github.com/lizhanhui/opencensus-cpp/archive/refs/tags/v0.4.1.tar.gz", ], strip_prefix = "opencensus-cpp-0.4.1", @@ -96,7 +89,6 @@ def rocketmq_deps(): sha256 = "dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4", strip_prefix = "abseil-cpp-20211102.0", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/abseil/abseil-cpp-20211102.0.tar.gz", "https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz", ], ) @@ -107,7 +99,6 @@ def rocketmq_deps(): strip_prefix = "gflags-2.2.2", sha256 = "34af2f15cf7367513b352bdcd2493ab14ce43692d2dcd9dfc499492966c64dcf", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/gflags/gflags-2.2.2.tar.gz", "https://github.com/gflags/gflags/archive/refs/tags/v2.2.2.tar.gz", ], ) @@ -118,7 +109,6 @@ def rocketmq_deps(): strip_prefix = "grpc-1.46.3", sha256 = "d6cbf22cb5007af71b61c6be316a79397469c58c82a942552a62e708bce60964", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/grpc/grpc-1.46.3.tar.gz", "https://github.com/grpc/grpc/archive/refs/tags/v1.46.3.tar.gz", ], ) @@ -130,7 +120,6 @@ def rocketmq_deps(): build_file = "@org_apache_rocketmq//third_party:asio.BUILD", strip_prefix = "asio-1.18.2", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/asio/asio-1.18.2.tar.gz", "https://github.com/lizhanhui/asio/archive/refs/tags/v1.18.2.tar.gz", ], ) @@ -140,7 +129,6 @@ def rocketmq_deps(): name = "com_google_googleapis", sha256 = "e89f15d54b0ddab0cd41d18cb2299e5447db704e2b05ff141cb1769170671466", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googleapis/googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip", "https://github.com/googleapis/googleapis/archive/af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip", ], strip_prefix = "googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95", @@ -152,7 +140,6 @@ def rocketmq_deps(): sha256 = "cdf6b84084aad8f10bf20b46b77cb48d83c319ebe6458a18e9d2cebf57807cdd", strip_prefix = "rules_python-0.8.1", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-python/rules_python-0.8.1.tar.gz", "https://github.com/bazelbuild/rules_python/archive/refs/tags/0.8.1.tar.gz", ], ) @@ -161,7 +148,6 @@ def rocketmq_deps(): http_archive, name = "rules_swift", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_swift/rules_swift-0.27.0.tar.gz", "https://github.com/bazelbuild/rules_swift/archive/refs/tags/0.27.0.tar.gz", ], strip_prefix = "rules_swift-0.27.0", @@ -172,7 +158,6 @@ def rocketmq_deps(): name = "io_bazel_rules_go", sha256 = "685052b498b6ddfe562ca7a97736741d87916fe536623afb7da2824c0211c369", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-go/rules_go-v0.33.0.zip", "https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip", "https://github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip", ], @@ -184,7 +169,15 @@ def rocketmq_deps(): sha256 = "e017528fd1c91c5a33f15493e3a398181a9e821a804eb7ff5acdd1d2d6c2b18d", strip_prefix = "rules_proto-4.0.0-3.20.0", urls = [ - "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto/rules_proto-4.0.0-3.20.0.tar.gz", "https://github.com/bazelbuild/rules_proto/archive/refs/tags/4.0.0-3.20.0.tar.gz", ], ) + + maybe( + http_archive, + name = "com_github_opentelemetry", + strip_prefix = "opentelemetry-cpp-1.14.2", + urls = [ + "https://github.com/open-telemetry/opentelemetry-cpp/archive/refs/tags/v1.14.2.tar.gz" + ] + ) diff --git a/cpp/examples/BUILD.bazel b/cpp/examples/BUILD.bazel index 5113e150..771b0e04 100644 --- a/cpp/examples/BUILD.bazel +++ b/cpp/examples/BUILD.bazel @@ -91,4 +91,15 @@ cc_binary( "//source/rocketmq:rocketmq_library", "@com_github_gflags_gflags//:gflags", ], +) + +cc_binary( + name = "example_fifo_producer", + srcs = [ + "ExampleFifoProducer.cpp", + ], + deps = [ + "//source/rocketmq:rocketmq_library", + "@com_github_gflags_gflags//:gflags", + ] ) \ No newline at end of file diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt index 8d6b0399..27304477 100644 --- a/cpp/examples/CMakeLists.txt +++ b/cpp/examples/CMakeLists.txt @@ -4,6 +4,7 @@ function(add_example name file) endfunction() add_example(example_producer ExampleProducer.cpp) +add_example(example_fifo_producer ExampleFifoProducer.cpp) add_example(example_producer_with_async ExampleProducerWithAsync.cpp) add_example(example_producer_with_fifo_message ExampleProducerWithFifoMessage.cpp) add_example(example_producer_with_timed_message ExampleProducerWithTimedMessage.cpp) diff --git a/cpp/examples/ExampleProducerWithAsync.cpp b/cpp/examples/ExampleFifoProducer.cpp similarity index 73% copy from cpp/examples/ExampleProducerWithAsync.cpp copy to cpp/examples/ExampleFifoProducer.cpp index 62ee7781..9d99be36 100644 --- a/cpp/examples/ExampleProducerWithAsync.cpp +++ b/cpp/examples/ExampleFifoProducer.cpp @@ -17,16 +17,19 @@ #include <algorithm> #include <atomic> #include <condition_variable> -#include <cstdint> #include <iostream> -#include <mutex> +#include <memory> #include <random> #include <string> #include <system_error> #include "gflags/gflags.h" +#include "rocketmq/CredentialsProvider.h" +#include "rocketmq/FifoProducer.h" +#include "rocketmq/Logger.h" #include "rocketmq/Message.h" #include "rocketmq/Producer.h" +#include "rocketmq/SendReceipt.h" using namespace ROCKETMQ_NAMESPACE; @@ -94,30 +97,33 @@ DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are publi DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider"); DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); -DEFINE_uint32(concurrency, 128, "Concurrency of async send"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); +DEFINE_uint32(concurrency, 16, "Concurrency of FIFO producer"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); auto& logger = getLogger(); - logger.setConsoleLevel(Level::Info); - logger.setLevel(Level::Info); + logger.setConsoleLevel(Level::Debug); + logger.setLevel(Level::Debug); logger.init(); + // Access Key/Secret pair may be acquired from management console CredentialsProviderPtr credentials_provider; if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) { credentials_provider = std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, FLAGS_access_secret); } - // In most case, you don't need to create too many producers, singletion pattern is recommended. - auto producer = Producer::newBuilder() + // In most case, you don't need to create too many producers, singleton pattern is recommended. + auto producer = FifoProducer::newBuilder() .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) + .withConcurrency(FLAGS_concurrency) .withTopics({FLAGS_topic}) .build(); @@ -127,8 +133,8 @@ int main(int argc, char* argv[]) { auto stats_lambda = [&] { while (!stopped.load(std::memory_order_relaxed)) { long cnt = count.load(std::memory_order_relaxed); - while (count.compare_exchange_weak(cnt, 0)) { - break; + while (!count.compare_exchange_weak(cnt, 0)) { + cnt = count.load(std::memory_order_relaxed); } std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "QPS: " << cnt << std::endl; @@ -145,25 +151,32 @@ int main(int argc, char* argv[]) { std::unique_ptr<Semaphore> semaphore(new Semaphore(FLAGS_concurrency)); - auto send_callback = [&](const std::error_code& ec, const SendReceipt& receipt) { - std::unique_lock<std::mutex> lk(mtx); - semaphore->release(); - completed++; - count++; - if (completed >= FLAGS_total) { - cv.notify_all(); + try { + for (std::size_t i = 0; i < FLAGS_total; ++i) { + auto message = Message::newBuilder() + .withTopic(FLAGS_topic) + .withTag("TagA") + .withKeys({"Key-" + std::to_string(i)}) + .withGroup("message-group" + std::to_string(i % FLAGS_concurrency)) + .withBody(body) + .build(); + std::error_code ec; + auto callback = [&](const std::error_code& ec, const SendReceipt& receipt) mutable { + completed++; + count++; + semaphore->release(); + + if (completed >= FLAGS_total) { + cv.notify_all(); + } + }; + + semaphore->acquire(); + producer.send(std::move(message), callback); + std::cout << "Cached No." << i << " message" << std::endl; } - }; - - for (std::size_t i = 0; i < FLAGS_total; ++i) { - auto message = Message::newBuilder() - .withTopic(FLAGS_topic) - .withTag("TagA") - .withKeys({"Key-" + std::to_string(i)}) - .withBody(body) - .build(); - semaphore->acquire(); - producer.send(std::move(message), send_callback); + } catch (...) { + std::cerr << "Ah...No!!!" << std::endl; } { @@ -178,4 +191,4 @@ int main(int argc, char* argv[]) { } return EXIT_SUCCESS; -} \ No newline at end of file +} diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp index 57293c24..5e20cc12 100644 --- a/cpp/examples/ExampleProducer.cpp +++ b/cpp/examples/ExampleProducer.cpp @@ -57,6 +57,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -77,7 +78,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .build(); @@ -88,8 +89,8 @@ int main(int argc, char* argv[]) { auto stats_lambda = [&] { while (!stopped.load(std::memory_order_relaxed)) { long cnt = count.load(std::memory_order_relaxed); - while (count.compare_exchange_weak(cnt, 0)) { - break; + while (!count.compare_exchange_weak(cnt, 0)) { + cnt = count.load(std::memory_order_relaxed); } std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "QPS: " << cnt << std::endl; diff --git a/cpp/examples/ExampleProducerWithAsync.cpp b/cpp/examples/ExampleProducerWithAsync.cpp index 62ee7781..d88dfc85 100644 --- a/cpp/examples/ExampleProducerWithAsync.cpp +++ b/cpp/examples/ExampleProducerWithAsync.cpp @@ -17,7 +17,6 @@ #include <algorithm> #include <atomic> #include <condition_variable> -#include <cstdint> #include <iostream> #include <mutex> #include <random> @@ -97,6 +96,7 @@ DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_uint32(concurrency, 128, "Concurrency of async send"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -116,7 +116,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .build(); @@ -127,8 +127,8 @@ int main(int argc, char* argv[]) { auto stats_lambda = [&] { while (!stopped.load(std::memory_order_relaxed)) { long cnt = count.load(std::memory_order_relaxed); - while (count.compare_exchange_weak(cnt, 0)) { - break; + while (!count.compare_exchange_weak(cnt, 0)) { + cnt = count.load(std::memory_order_relaxed); } std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "QPS: " << cnt << std::endl; diff --git a/cpp/examples/ExampleProducerWithFifoMessage.cpp b/cpp/examples/ExampleProducerWithFifoMessage.cpp index 09b8d407..4fa34f9d 100644 --- a/cpp/examples/ExampleProducerWithFifoMessage.cpp +++ b/cpp/examples/ExampleProducerWithFifoMessage.cpp @@ -54,6 +54,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -74,7 +75,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .build(); @@ -83,10 +84,11 @@ int main(int argc, char* argv[]) { std::atomic_long count(0); auto stats_lambda = [&] { + std::cout << "Stats thread starts" << std::endl; while (!stopped.load(std::memory_order_relaxed)) { long cnt = count.load(std::memory_order_relaxed); - while (count.compare_exchange_weak(cnt, 0)) { - break; + while (!count.compare_exchange_weak(cnt, 0)) { + cnt = count.load(std::memory_order_relaxed); } std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "QPS: " << cnt << std::endl; @@ -109,7 +111,7 @@ int main(int argc, char* argv[]) { .build(); std::error_code ec; SendReceipt send_receipt = producer.send(std::move(message), ec); - std::cout << "Message-ID: " << send_receipt.message_id << std::endl; + // std::cout << "Message-ID: " << send_receipt.message_id << std::endl; count++; } } catch (...) { diff --git a/cpp/examples/ExampleProducerWithTimedMessage.cpp b/cpp/examples/ExampleProducerWithTimedMessage.cpp index 8f12f5b6..d6237459 100644 --- a/cpp/examples/ExampleProducerWithTimedMessage.cpp +++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp @@ -56,6 +56,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -75,7 +76,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .build(); @@ -86,8 +87,8 @@ int main(int argc, char* argv[]) { auto stats_lambda = [&] { while (!stopped.load(std::memory_order_relaxed)) { long cnt = count.load(std::memory_order_relaxed); - while (count.compare_exchange_weak(cnt, 0)) { - break; + while (!count.compare_exchange_weak(cnt, 0)) { + cnt = count.load(std::memory_order_relaxed); } std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "QPS: " << cnt << std::endl; diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp index befb18ca..50620c5a 100644 --- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp +++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp @@ -54,6 +54,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -79,7 +80,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .withTransactionChecker(checker) @@ -91,8 +92,8 @@ int main(int argc, char* argv[]) { auto stats_lambda = [&] { while (!stopped.load(std::memory_order_relaxed)) { long cnt = count.load(std::memory_order_relaxed); - while (count.compare_exchange_weak(cnt, 0)) { - break; + while (!count.compare_exchange_weak(cnt, 0)) { + cnt = count.load(std::memory_order_relaxed); } std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "QPS: " << cnt << std::endl; diff --git a/cpp/examples/ExamplePushConsumer.cpp b/cpp/examples/ExamplePushConsumer.cpp index 1e20b2ee..66a85f4b 100644 --- a/cpp/examples/ExamplePushConsumer.cpp +++ b/cpp/examples/ExamplePushConsumer.cpp @@ -16,7 +16,6 @@ */ #include <chrono> #include <iostream> -#include <mutex> #include <thread> #include "gflags/gflags.h" @@ -30,6 +29,7 @@ DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provide DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -58,7 +58,7 @@ int main(int argc, char* argv[]) { .withEndpoints(FLAGS_access_point) .withRequestTimeout(std::chrono::seconds(3)) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .withConsumeThreads(4) .withListener(listener) diff --git a/cpp/examples/ExampleSimpleConsumer.cpp b/cpp/examples/ExampleSimpleConsumer.cpp index 4c30214f..aedec71e 100644 --- a/cpp/examples/ExampleSimpleConsumer.cpp +++ b/cpp/examples/ExampleSimpleConsumer.cpp @@ -16,7 +16,6 @@ */ #include <chrono> #include <iostream> -#include <thread> #include "gflags/gflags.h" #include "rocketmq/Logger.h" @@ -29,6 +28,7 @@ DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provide DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); +DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -51,7 +51,7 @@ int main(int argc, char* argv[]) { .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) - .withSsl(true) + .withSsl(FLAGS_tls) .build()) .subscribe(FLAGS_topic, tag) .build(); diff --git a/cpp/include/rocketmq/Configuration.h b/cpp/include/rocketmq/Configuration.h index 0037c270..6dcd4137 100644 --- a/cpp/include/rocketmq/Configuration.h +++ b/cpp/include/rocketmq/Configuration.h @@ -44,7 +44,7 @@ public: } bool withSsl() const { - return withSsl_; + return tls_; } protected: @@ -56,7 +56,7 @@ private: std::string endpoints_; CredentialsProviderPtr credentials_provider_; std::chrono::milliseconds request_timeout_{ConfigurationDefaults::RequestTimeout}; - bool withSsl_ = true; + bool tls_ = true; }; class ConfigurationBuilder { @@ -67,7 +67,7 @@ public: ConfigurationBuilder& withRequestTimeout(std::chrono::milliseconds request_timeout); - ConfigurationBuilder& withSsl(bool enable); + ConfigurationBuilder& withSsl(bool with_ssl); Configuration build(); diff --git a/cpp/include/rocketmq/FifoProducer.h b/cpp/include/rocketmq/FifoProducer.h new file mode 100644 index 00000000..1ee20d91 --- /dev/null +++ b/cpp/include/rocketmq/FifoProducer.h @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <cstddef> +#include <memory> +#include <vector> + +#include "Configuration.h" +#include "Message.h" +#include "RocketMQ.h" +#include "SendCallback.h" + +ROCKETMQ_NAMESPACE_BEGIN + +class FifoProducerImpl; +class FifoProducerBuilder; +class ProducerImpl; + +class FifoProducer { +public: + static FifoProducerBuilder newBuilder(); + + void send(MessageConstPtr message, SendCallback callback); + +private: + std::shared_ptr<FifoProducerImpl> impl_; + + explicit FifoProducer(std::shared_ptr<FifoProducerImpl> impl) : impl_(std::move(impl)) { + } + + void start(); + + friend class FifoProducerBuilder; +}; + +class FifoProducerBuilder { +public: + FifoProducerBuilder(); + + FifoProducerBuilder& withConfiguration(Configuration configuration); + + FifoProducerBuilder& withTopics(const std::vector<std::string>& topics); + + FifoProducerBuilder& withConcurrency(std::size_t concurrency); + + FifoProducer build(); + +private: + std::shared_ptr<FifoProducerImpl> impl_; + std::shared_ptr<ProducerImpl> producer_impl_; +}; + +ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h index 42004eb3..6b42843d 100644 --- a/cpp/include/rocketmq/Producer.h +++ b/cpp/include/rocketmq/Producer.h @@ -16,20 +16,17 @@ */ #pragma once -#include <chrono> -#include <functional> #include <memory> #include <system_error> #include <vector> #include "Configuration.h" -#include "ErrorCode.h" -#include "Logger.h" #include "Message.h" #include "SendCallback.h" #include "SendReceipt.h" #include "Transaction.h" #include "TransactionChecker.h" +#include "rocketmq/Logger.h" ROCKETMQ_NAMESPACE_BEGIN diff --git a/cpp/include/rocketmq/SendReceipt.h b/cpp/include/rocketmq/SendReceipt.h index 489df5ec..7eef6e79 100644 --- a/cpp/include/rocketmq/SendReceipt.h +++ b/cpp/include/rocketmq/SendReceipt.h @@ -16,20 +16,21 @@ */ #pragma once -#include <cstdint> #include <string> -#include <utility> #include "RocketMQ.h" +#include "rocketmq/Message.h" ROCKETMQ_NAMESPACE_BEGIN struct SendReceipt { + std::string target; + std::string message_id; std::string transaction_id; - std::string target; + MessageConstPtr message; }; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/CMakeLists.txt b/cpp/source/CMakeLists.txt index d42c6a1a..dbf5ab01 100644 --- a/cpp/source/CMakeLists.txt +++ b/cpp/source/CMakeLists.txt @@ -53,7 +53,10 @@ target_link_libraries(rocketmq_shared opencensus::stats opencensus_proto spdlog) +set(VERSION_SCRIPT ${CMAKE_CURRENT_SOURCE_DIR}/exports.map) +set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -Wl,--version-script=${VERSION_SCRIPT}") set_target_properties(rocketmq_shared PROPERTIES + LINK_DEPENDS ${VERSION_SCRIPT} LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR} LIBRARY_OUTPUT_NAME rocketmq) \ No newline at end of file diff --git a/cpp/source/base/Configuration.cpp b/cpp/source/base/Configuration.cpp index 2a136d5d..66cff2e8 100644 --- a/cpp/source/base/Configuration.cpp +++ b/cpp/source/base/Configuration.cpp @@ -38,8 +38,8 @@ ConfigurationBuilder& ConfigurationBuilder::withRequestTimeout(std::chrono::mill return *this; } -ConfigurationBuilder& ConfigurationBuilder::withSsl(bool enable) { - configuration_.withSsl_ = enable; +ConfigurationBuilder& ConfigurationBuilder::withSsl(bool with_ssl) { + configuration_.tls_ = with_ssl; return *this; } diff --git a/cpp/source/client/ClientManagerImpl.cpp b/cpp/source/client/ClientManagerImpl.cpp index 5865dbb2..7d724c7b 100644 --- a/cpp/source/client/ClientManagerImpl.cpp +++ b/cpp/source/client/ClientManagerImpl.cpp @@ -24,34 +24,30 @@ #include <utility> #include <vector> -#include "apache/rocketmq/v2/definition.pb.h" #include "InvocationContext.h" #include "LogInterceptor.h" #include "LogInterceptorFactory.h" -#include "rocketmq/Logger.h" -#include "spdlog/spdlog.h" -#include "MessageExt.h" -#include "MetadataConstants.h" #include "MixAll.h" #include "Protocol.h" #include "ReceiveMessageContext.h" #include "RpcClient.h" #include "RpcClientImpl.h" #include "Scheduler.h" -#include "TlsHelper.h" +#include "SchedulerImpl.h" #include "UtilAll.h" #include "google/protobuf/util/time_util.h" #include "grpcpp/create_channel.h" #include "rocketmq/ErrorCode.h" -#include "rocketmq/SendReceipt.h" +#include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN -ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool withSsl) - : scheduler_(std::make_shared<SchedulerImpl>()), resource_namespace_(std::move(resource_namespace)), +ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool with_ssl) + : scheduler_(std::make_shared<SchedulerImpl>()), + resource_namespace_(std::move(resource_namespace)), state_(State::CREATED), callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())), - withSsl_(withSsl){ + with_ssl_(with_ssl) { certificate_verifier_ = grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>(); tls_channel_credential_options_.set_verify_server_certs(false); tls_channel_credential_options_.set_check_call_host(false); @@ -175,8 +171,10 @@ std::vector<std::string> ClientManagerImpl::cleanOfflineRpcClients() { return removed; } -void ClientManagerImpl::heartbeat(const std::string& target_host, const Metadata& metadata, - const HeartbeatRequest& request, std::chrono::milliseconds timeout, +void ClientManagerImpl::heartbeat(const std::string& target_host, + const Metadata& metadata, + const HeartbeatRequest& request, + std::chrono::milliseconds timeout, const std::function<void(const std::error_code&, const HeartbeatResponse&)>& cb) { SPDLOG_DEBUG("Prepare to send heartbeat to {}. Request: {}", target_host, request.DebugString()); auto client = getRpcClient(target_host, true); @@ -279,8 +277,10 @@ void ClientManagerImpl::doHeartbeat() { } } -bool ClientManagerImpl::send(const std::string& target_host, const Metadata& metadata, SendMessageRequest& request, - SendCallback cb) { +bool ClientManagerImpl::send(const std::string& target_host, + const Metadata& metadata, + SendMessageRequest& request, + SendResultCallback cb) { assert(cb); SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", target_host, request.DebugString()); RpcClientSharedPtr client = getRpcClient(target_host); @@ -306,15 +306,14 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met return; } - SendReceipt send_receipt = {}; - send_receipt.target = target_host; - std::error_code ec; + SendResult send_result = {}; + send_result.target = target_host; if (!invocation_context->status.ok()) { SPDLOG_WARN("Failed to send message to {} due to gRPC error. gRPC code: {}, gRPC error message: {}", invocation_context->remote_address, invocation_context->status.error_code(), invocation_context->status.error_message()); - ec = ErrorCode::RequestTimeout; - cb(ec, send_receipt); + send_result.ec = ErrorCode::RequestTimeout; + cb(send_result); return; } @@ -323,8 +322,8 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met case rmq::Code::OK: { if (!invocation_context->response.entries().empty()) { auto first = invocation_context->response.entries().begin(); - send_receipt.message_id = first->message_id(); - send_receipt.transaction_id = first->transaction_id(); + send_result.message_id = first->message_id(); + send_result.transaction_id = first->transaction_id(); } else { SPDLOG_ERROR("Unexpected send-message-response: {}", invocation_context->response.DebugString()); } @@ -333,126 +332,127 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met case rmq::Code::ILLEGAL_TOPIC: { SPDLOG_ERROR("IllegalTopic: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::IllegalTopic; + send_result.ec = ErrorCode::IllegalTopic; break; } case rmq::Code::ILLEGAL_MESSAGE_TAG: { SPDLOG_ERROR("IllegalMessageTag: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::IllegalMessageTag; + send_result.ec = ErrorCode::IllegalMessageTag; break; } case rmq::Code::ILLEGAL_MESSAGE_KEY: { SPDLOG_ERROR("IllegalMessageKey: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::IllegalMessageKey; + send_result.ec = ErrorCode::IllegalMessageKey; break; } case rmq::Code::ILLEGAL_MESSAGE_GROUP: { SPDLOG_ERROR("IllegalMessageGroup: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::IllegalMessageGroup; + send_result.ec = ErrorCode::IllegalMessageGroup; break; } case rmq::Code::ILLEGAL_MESSAGE_PROPERTY_KEY: { SPDLOG_ERROR("IllegalMessageProperty: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::IllegalMessageProperty; + send_result.ec = ErrorCode::IllegalMessageProperty; break; } case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: { SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::MessagePropertiesTooLarge; + send_result.ec = ErrorCode::MessagePropertiesTooLarge; break; } case rmq::Code::MESSAGE_BODY_TOO_LARGE: { SPDLOG_ERROR("MessageBodyTooLarge: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::MessageBodyTooLarge; + send_result.ec = ErrorCode::MessageBodyTooLarge; break; } case rmq::Code::TOPIC_NOT_FOUND: { SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::TopicNotFound; + send_result.ec = ErrorCode::TopicNotFound; break; } case rmq::Code::NOT_FOUND: { SPDLOG_WARN("NotFound: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::NotFound; + send_result.ec = ErrorCode::NotFound; break; } case rmq::Code::UNAUTHORIZED: { SPDLOG_WARN("Unauthenticated: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::Unauthorized; + send_result.ec = ErrorCode::Unauthorized; break; } - + case rmq::Code::FORBIDDEN: { SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::Forbidden; + send_result.ec = ErrorCode::Forbidden; break; } case rmq::Code::MESSAGE_CORRUPTED: { SPDLOG_WARN("MessageCorrupted: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::MessageCorrupted; + send_result.ec = ErrorCode::MessageCorrupted; break; } case rmq::Code::TOO_MANY_REQUESTS: { SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::TooManyRequests; + send_result.ec = ErrorCode::TooManyRequests; break; } case rmq::Code::INTERNAL_SERVER_ERROR: { SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::InternalServerError; + send_result.ec = ErrorCode::InternalServerError; break; } case rmq::Code::HA_NOT_AVAILABLE: { SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::InternalServerError; + send_result.ec = ErrorCode::InternalServerError; break; } case rmq::Code::PROXY_TIMEOUT: { SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::GatewayTimeout; + send_result.ec = ErrorCode::GatewayTimeout; break; } case rmq::Code::MASTER_PERSISTENCE_TIMEOUT: { SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::GatewayTimeout; + send_result.ec = ErrorCode::GatewayTimeout; break; } case rmq::Code::SLAVE_PERSISTENCE_TIMEOUT: { SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::GatewayTimeout; + send_result.ec = ErrorCode::GatewayTimeout; break; } case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: { - SPDLOG_WARN("Message-property-conflict-with-type: Host={}, Response={}", invocation_context->remote_address, invocation_context->response.DebugString()); - ec = ErrorCode::MessagePropertyConflictWithType; + SPDLOG_WARN("Message-property-conflict-with-type: Host={}, Response={}", invocation_context->remote_address, + invocation_context->response.DebugString()); + send_result.ec = ErrorCode::MessagePropertyConflictWithType; break; } default: { SPDLOG_WARN("NotSupported: Check and upgrade SDK to the latest. Host={}", invocation_context->remote_address); - ec = ErrorCode::NotSupported; + send_result.ec = ErrorCode::NotSupported; break; } } - cb(ec, send_receipt); + cb(send_result); }; invocation_context->callback = completion_callback; @@ -470,7 +470,8 @@ std::shared_ptr<grpc::Channel> ClientManagerImpl::createChannel(const std::strin std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories; interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>()); auto channel = grpc::experimental::CreateCustomChannelWithInterceptors( - target_host, withSsl_ ? channel_credential_ : grpc::InsecureChannelCredentials(), channel_arguments_, std::move(interceptor_factories)); + target_host, with_ssl_ ? channel_credential_ : grpc::InsecureChannelCredentials(), channel_arguments_, + std::move(interceptor_factories)); return channel; } @@ -513,27 +514,28 @@ void ClientManagerImpl::cleanRpcClients() { rpc_clients_.clear(); } -SendReceipt ClientManagerImpl::processSendResponse(const rmq::MessageQueue& message_queue, - const SendMessageResponse& response, std::error_code& ec) { - SendReceipt send_receipt; +SendResult ClientManagerImpl::processSendResponse(const rmq::MessageQueue& message_queue, + const SendMessageResponse& response, + std::error_code& ec) { + SendResult send_result; switch (response.status().code()) { case rmq::Code::OK: { assert(response.entries_size() > 0); - send_receipt.message_id = response.entries().begin()->message_id(); - send_receipt.transaction_id = response.entries().begin()->transaction_id(); - return send_receipt; + send_result.message_id = response.entries().begin()->message_id(); + send_result.transaction_id = response.entries().begin()->transaction_id(); + return send_result; } case rmq::Code::ILLEGAL_TOPIC: { ec = ErrorCode::BadRequest; - return send_receipt; + return send_result; } default: { // TODO: handle other cases. break; } } - return send_receipt; + return send_result; } void ClientManagerImpl::addClientObserver(std::weak_ptr<Client> client) { @@ -541,8 +543,10 @@ void ClientManagerImpl::addClientObserver(std::weak_ptr<Client> client) { clients_.emplace_back(std::move(client)); } -void ClientManagerImpl::resolveRoute(const std::string& target_host, const Metadata& metadata, - const QueryRouteRequest& request, std::chrono::milliseconds timeout, +void ClientManagerImpl::resolveRoute(const std::string& target_host, + const Metadata& metadata, + const QueryRouteRequest& request, + std::chrono::milliseconds timeout, const std::function<void(const std::error_code&, const TopicRouteDataPtr&)>& cb) { SPDLOG_DEBUG("Name server connection URL: {}", target_host); SPDLOG_DEBUG("Query route request: {}", request.DebugString()); @@ -646,7 +650,9 @@ void ClientManagerImpl::resolveRoute(const std::string& target_host, const Metad } void ClientManagerImpl::queryAssignment( - const std::string& target, const Metadata& metadata, const QueryAssignmentRequest& request, + const std::string& target, + const Metadata& metadata, + const QueryAssignmentRequest& request, std::chrono::milliseconds timeout, const std::function<void(const std::error_code&, const QueryAssignmentResponse&)>& cb) { SPDLOG_DEBUG("Prepare to send query assignment request to broker[address={}]", target); @@ -748,8 +754,10 @@ void ClientManagerImpl::queryAssignment( client->asyncQueryAssignment(request, invocation_context); } -void ClientManagerImpl::receiveMessage(const std::string& target_host, const Metadata& metadata, - const ReceiveMessageRequest& request, std::chrono::milliseconds timeout, +void ClientManagerImpl::receiveMessage(const std::string& target_host, + const Metadata& metadata, + const ReceiveMessageRequest& request, + std::chrono::milliseconds timeout, ReceiveMessageCallback cb) { SPDLOG_DEBUG("Prepare to receive message from {} asynchronously. Request: {}", target_host, request.DebugString()); RpcClientSharedPtr client = getRpcClient(target_host); @@ -765,7 +773,6 @@ State ClientManagerImpl::state() const { } MessageConstSharedPtr ClientManagerImpl::wrapMessage(const rmq::Message& item) { - assert(item.topic().resource_namespace() == resource_namespace_); auto builder = Message::newBuilder(); // base @@ -955,8 +962,11 @@ SchedulerSharedPtr ClientManagerImpl::getScheduler() { return scheduler_; } -void ClientManagerImpl::ack(const std::string& target, const Metadata& metadata, const AckMessageRequest& request, - std::chrono::milliseconds timeout, const std::function<void(const std::error_code&)>& cb) { +void ClientManagerImpl::ack(const std::string& target, + const Metadata& metadata, + const AckMessageRequest& request, + std::chrono::milliseconds timeout, + const std::function<void(const std::error_code&)>& cb) { std::string target_host(target.data(), target.length()); SPDLOG_DEBUG("Prepare to ack message against {} asynchronously. AckMessageRequest: {}", target_host, request.DebugString()); @@ -1066,8 +1076,11 @@ void ClientManagerImpl::ack(const std::string& target, const Metadata& metadata, } void ClientManagerImpl::changeInvisibleDuration( - const std::string& target_host, const Metadata& metadata, const ChangeInvisibleDurationRequest& request, - std::chrono::milliseconds timeout, const std::function<void(const std::error_code&)>& completion_callback) { + const std::string& target_host, + const Metadata& metadata, + const ChangeInvisibleDurationRequest& request, + std::chrono::milliseconds timeout, + const std::function<void(const std::error_code&)>& completion_callback) { RpcClientSharedPtr client = getRpcClient(target_host); assert(client); auto invocation_context = new InvocationContext<ChangeInvisibleDurationResponse>(); @@ -1133,7 +1146,7 @@ void ClientManagerImpl::changeInvisibleDuration( ec = ErrorCode::Forbidden; break; } - + case rmq::Code::INTERNAL_SERVER_ERROR: { SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address); ec = ErrorCode::InternalServerError; @@ -1159,7 +1172,9 @@ void ClientManagerImpl::changeInvisibleDuration( } void ClientManagerImpl::endTransaction( - const std::string& target_host, const Metadata& metadata, const EndTransactionRequest& request, + const std::string& target_host, + const Metadata& metadata, + const EndTransactionRequest& request, std::chrono::milliseconds timeout, const std::function<void(const std::error_code&, const EndTransactionResponse&)>& cb) { RpcClientSharedPtr client = getRpcClient(target_host); @@ -1339,7 +1354,7 @@ void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe ec = ErrorCode::ServiceUnavailable; break; } - + case rmq::Code::TOO_MANY_REQUESTS: { ec = ErrorCode::TooManyRequests; break; @@ -1362,7 +1377,8 @@ void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe client->asyncForwardMessageToDeadLetterQueue(request, invocation_context); } -std::error_code ClientManagerImpl::notifyClientTermination(const std::string& target_host, const Metadata& metadata, +std::error_code ClientManagerImpl::notifyClientTermination(const std::string& target_host, + const Metadata& metadata, const NotifyClientTerminationRequest& request, std::chrono::milliseconds timeout) { std::error_code ec; @@ -1446,4 +1462,4 @@ void ClientManagerImpl::submit(std::function<void()> task) { const char* ClientManagerImpl::HEARTBEAT_TASK_NAME = "heartbeat-task"; const char* ClientManagerImpl::STATS_TASK_NAME = "stats-task"; -ROCKETMQ_NAMESPACE_END \ No newline at end of file +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/client/SessionImpl.cpp b/cpp/source/client/SessionImpl.cpp index 0ca3fff2..36416829 100644 --- a/cpp/source/client/SessionImpl.cpp +++ b/cpp/source/client/SessionImpl.cpp @@ -16,12 +16,14 @@ */ #include "SessionImpl.h" + #include "rocketmq/Logger.h" #include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN -SessionImpl::SessionImpl(std::weak_ptr<Client> client, std::shared_ptr<RpcClient> rpc_client) : client_(client), rpc_client_(rpc_client) { +SessionImpl::SessionImpl(std::weak_ptr<Client> client, std::shared_ptr<RpcClient> rpc_client) + : client_(client), rpc_client_(rpc_client) { telemetry_ = rpc_client->asyncTelemetry(client_); syncSettings(); } @@ -39,8 +41,8 @@ void SessionImpl::syncSettings() { } SessionImpl::~SessionImpl() { - SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress()); telemetry_->fireClose(); + SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress()); } ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/client/TelemetryBidiReactor.cpp b/cpp/source/client/TelemetryBidiReactor.cpp index 6c5b2f93..d75ac361 100644 --- a/cpp/source/client/TelemetryBidiReactor.cpp +++ b/cpp/source/client/TelemetryBidiReactor.cpp @@ -22,21 +22,24 @@ #include <utility> #include "ClientManager.h" -#include "rocketmq/Logger.h" -#include "spdlog/spdlog.h" #include "MessageExt.h" #include "Metadata.h" #include "RpcClient.h" #include "Signature.h" #include "google/protobuf/util/time_util.h" +#include "rocketmq/Logger.h" +#include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client, rmq::MessagingService::Stub* stub, std::string peer_address) - : client_(client), peer_address_(std::move(peer_address)), stream_state_(StreamState::Created) { - auto ptr = client.lock(); + : client_(client), + peer_address_(std::move(peer_address)), + read_state_(StreamState::Created), + write_state_(StreamState::Created) { + auto ptr = client_.lock(); auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1); context_.set_deadline(deadline); Metadata metadata; @@ -45,69 +48,126 @@ TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client, context_.AddMetadata(entry.first, entry.second); } stub->async()->Telemetry(&context_, this); + write_state_ = StreamState::Ready; + // Increase hold for write stream. + AddHold(); StartCall(); } TelemetryBidiReactor::~TelemetryBidiReactor() { - SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}", peer_address_, - static_cast<std::uint8_t>(stream_state_)); + SPDLOG_INFO("Telemetry stream for {} destructed. ReadStreamState={}, WriteStreamState={}", peer_address_, + static_cast<std::uint8_t>(read_state_), static_cast<std::uint8_t>(read_state_)); } bool TelemetryBidiReactor::await() { - absl::MutexLock lk(&server_setting_received_mtx_); - if (server_setting_received_) { + absl::MutexLock lk(&state_mtx_); + if (StreamState::Created != write_state_) { return true; } - server_setting_received_cv_.Wait(&server_setting_received_mtx_); - return server_setting_received_; + state_cv_.Wait(&state_mtx_); + return StreamState::Error != write_state_; } void TelemetryBidiReactor::OnWriteDone(bool ok) { - SPDLOG_DEBUG("OnWriteDone: {}", ok); - - { - bool expect = true; - if (!command_inflight_.compare_exchange_strong(expect, false, std::memory_order_relaxed)) { - SPDLOG_WARN("Illegal command-inflight state"); - } - } + SPDLOG_DEBUG("{}#OnWriteDone", peer_address_); if (!ok) { - SPDLOG_WARN("Failed to write telemetry command {} to {}", write_.DebugString(), peer_address_); + RemoveHold(); { - absl::MutexLock lk(&stream_state_mtx_); - stream_state_ = StreamState::WriteDone; - } + absl::MutexLock lk(&state_mtx_); + SPDLOG_WARN("Failed to write telemetry command {} to {}", writes_.front().DebugString(), peer_address_); + write_state_ = StreamState::Error; + + // Sync read state. + switch (read_state_) { + case StreamState::Created: + case StreamState::Ready: { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closed)); + read_state_ = StreamState::Closed; + break; + } + case StreamState::Inflight: { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closing)); + read_state_ = StreamState::Closing; + break; + } + case StreamState::Closing: + case StreamState::Error: + case StreamState::Closed: { + break; + } + } - fireClose(); + state_cv_.SignalAll(); + } return; + } else { + absl::MutexLock lk(&state_mtx_); + if (StreamState::Inflight == write_state_) { + write_state_ = StreamState::Ready; + } } + // Check if the read stream has started. + fireRead(); + + // Remove the command that has been written to server. { - absl::MutexLock lk(&stream_state_mtx_); - if (StreamState::Created == stream_state_) { - stream_state_ = StreamState::Active; - fireRead(); - } + absl::MutexLock lk(&writes_mtx_); + writes_.pop_front(); } - fireWrite(); + tryWriteNext(); } void TelemetryBidiReactor::OnReadDone(bool ok) { - SPDLOG_DEBUG("OnReadDone: ok={}", ok); - if (!ok) { - if (client_.lock()) { - SPDLOG_WARN("Failed to read telemetry command from {}", peer_address_); - } - - { - absl::MutexLock lk(&stream_state_mtx_); - stream_state_ = StreamState::ReadDone; + SPDLOG_DEBUG("{}#OnReadDone", peer_address_); + { + absl::MutexLock lk(&state_mtx_); + if (!ok) { + // Remove read hold. + RemoveHold(); + { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Error)); + read_state_ = StreamState::Error; + SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_); + + // Sync write state + switch (write_state_) { + case StreamState::Created: { + // Not reachable + break; + } + case StreamState::Ready: { + write_state_ = StreamState::Closed; + // There is no inflight write request, remove write hold on its behalf. + RemoveHold(); + state_cv_.SignalAll(); + break; + } + case StreamState::Inflight: { + write_state_ = StreamState::Closing; + break; + } + case StreamState::Closing: + case StreamState::Error: + case StreamState::Closed: { + break; + } + } + } + return; + } else if (StreamState::Closing == read_state_) { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closed)); + read_state_ = StreamState::Closed; + state_cv_.SignalAll(); + return; } - fireClose(); - return; } SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.DebugString()); @@ -122,13 +182,6 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { auto settings = read_.settings(); SPDLOG_INFO("Received settings from {}: {}", peer_address_, settings.DebugString()); applySettings(settings); - { - absl::MutexLock lk(&server_setting_received_mtx_); - if (!server_setting_received_) { - server_setting_received_ = true; - server_setting_received_cv_.SignalAll(); - } - } break; } case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: { @@ -151,11 +204,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { TelemetryCommand response; response.mutable_thread_stack_trace()->set_nonce(read_.print_thread_stack_trace_command().nonce()); response.mutable_thread_stack_trace()->set_thread_stack_trace("PrintStackTrace is not supported"); - { - absl::MutexLock lk(&writes_mtx_); - writes_.push_back(response); - } - fireWrite(); + write(std::move(response)); break; } @@ -188,7 +237,18 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { } } - fireRead(); + { + absl::MutexLock lk(&state_mtx_); + if (StreamState::Inflight == read_state_) { + SPDLOG_DEBUG("Spawn new read op, read-state={}", static_cast<std::uint8_t>(read_state_)); + StartRead(&read_); + } else if (read_state_ == StreamState::Closing) { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closed)); + read_state_ = StreamState::Closed; + state_cv_.SignalAll(); + } + } } void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) { @@ -247,10 +307,33 @@ void TelemetryBidiReactor::applyBackoffPolicy(const rmq::Settings& settings, std } void TelemetryBidiReactor::applyPublishingConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) { + // The server may have implicitly assumed a namespace for the client. + if (!settings.publishing().topics().empty()) { + for (const auto& topic : settings.publishing().topics()) { + if (topic.resource_namespace() != client->config().resource_namespace) { + SPDLOG_INFO("Client namespace is changed from [{}] to [{}]", client->config().resource_namespace, + topic.resource_namespace()); + client->config().resource_namespace = topic.resource_namespace(); + break; + } + } + } client->config().publisher.max_body_size = settings.publishing().max_body_size(); } void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) { + // The server may have implicitly assumed a namespace for the client. + if (!settings.subscription().subscriptions().empty()) { + for (const auto& subscription : settings.subscription().subscriptions()) { + if (subscription.topic().resource_namespace() != client->config().resource_namespace) { + SPDLOG_INFO("Client namespace is changed from [{}] to [{}]", client->config().resource_namespace, + subscription.topic().resource_namespace()); + client->config().resource_namespace = subscription.topic().resource_namespace(); + break; + } + } + } + client->config().subscriber.fifo = settings.subscription().fifo(); auto polling_timeout = google::protobuf::util::TimeUtil::DurationToMilliseconds(settings.subscription().long_polling_timeout()); @@ -259,48 +342,144 @@ void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings } void TelemetryBidiReactor::fireRead() { - SPDLOG_DEBUG("{}#fireRead", peer_address_); + absl::MutexLock lk(&state_mtx_); + if (StreamState::Created != read_state_) { + SPDLOG_DEBUG("Further read from {} is not allowded due to stream-state={}", peer_address_, + static_cast<std::uint8_t>(read_state_)); + return; + } + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Ready)); + read_state_ = StreamState::Ready; + AddHold(); + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Inflight)); + read_state_ = StreamState::Inflight; StartRead(&read_); } void TelemetryBidiReactor::write(TelemetryCommand command) { + SPDLOG_DEBUG("{}#write", peer_address_); + { + absl::MutexLock lk(&state_mtx_); + // Reject incoming write commands if the stream state is closing or has witnessed some error. + switch (write_state_) { + case StreamState::Closing: + case StreamState::Error: + case StreamState::Closed: + return; + default: + // no-op + break; + } + } + { absl::MutexLock lk(&writes_mtx_); writes_.push_back(command); } - fireWrite(); + tryWriteNext(); } -void TelemetryBidiReactor::fireWrite() { - SPDLOG_DEBUG("{}#fireWrite", peer_address_); +void TelemetryBidiReactor::tryWriteNext() { + SPDLOG_DEBUG("{}#tryWriteNext", peer_address_); + { + absl::MutexLock lk(&state_mtx_); + if (StreamState::Error == write_state_ || StreamState::Closed == write_state_) { + SPDLOG_WARN("Further write to {} is not allowded due to stream-state={}", peer_address_, + static_cast<std::uint8_t>(write_state_)); + return; + } + } + { absl::MutexLock lk(&writes_mtx_); - if (writes_.empty()) { + if (writes_.empty() && StreamState::Closing != write_state_) { SPDLOG_DEBUG("No TelemtryCommand to write. Peer={}", peer_address_); return; } - bool expect = false; - if (command_inflight_.compare_exchange_strong(expect, true, std::memory_order_relaxed)) { - write_ = std::move(*writes_.begin()); - writes_.erase(writes_.begin()); + if (StreamState::Ready == write_state_) { + write_state_ = StreamState::Inflight; + } + + if (writes_.empty()) { + // Tell server there is no more write requests. + StartWritesDone(); } else { - SPDLOG_DEBUG("Another command is already on the wire. Peer={}", peer_address_); - return; + SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().DebugString()); + StartWrite(&(writes_.front())); } } - SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, write_.DebugString()); - StartWrite(&write_); } void TelemetryBidiReactor::fireClose() { SPDLOG_INFO("{}#fireClose", peer_address_); - if (StreamState::Active == stream_state_) { - StartWritesDone(); - { - absl::MutexLock lk(&stream_state_mtx_); - if (StreamState::Active == stream_state_) { - stream_state_cv_.Wait(&stream_state_mtx_); + + { + // Acquire state lock + absl::MutexLock lk(&state_mtx_); + + // Transition read state + switch (read_state_) { + case StreamState::Created: + case StreamState::Ready: { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closed)); + read_state_ = StreamState::Closed; + state_cv_.SignalAll(); + break; + } + + case StreamState::Inflight: { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closing)); + read_state_ = StreamState::Closing; + break; + } + case StreamState::Closing: { + break; + } + case StreamState::Closed: + case StreamState::Error: { + state_cv_.SignalAll(); + break; + } + } + + // Transition write state + switch (write_state_) { + case StreamState::Created: + case StreamState::Ready: + case StreamState::Inflight: { + SPDLOG_DEBUG("Change write-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closing)); + write_state_ = StreamState::Closing; + break; + } + case StreamState::Closing: { + break; + } + case StreamState::Closed: + case StreamState::Error: { + state_cv_.SignalAll(); + break; + } + } + } + + if (StreamState::Closing == write_state_) { + tryWriteNext(); + } + + { + // Acquire state lock + absl::MutexLock lk(&state_mtx_); + while ((StreamState::Closed != read_state_ && StreamState::Error != read_state_) || + (StreamState::Closed != write_state_ && StreamState::Error != write_state_)) { + if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) { + SPDLOG_WARN("StreamState CondVar timed out before getting signalled: read-state={}, write-state={}", + static_cast<uint8_t>(read_state_), static_cast<uint8_t>(write_state_)); } } } @@ -308,6 +487,20 @@ void TelemetryBidiReactor::fireClose() { void TelemetryBidiReactor::OnWritesDoneDone(bool ok) { SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_); + assert(StreamState::Closing == write_state_); + + absl::MutexLock lk(&state_mtx_); + // Remove the hold for the write stream. + RemoveHold(); + + if (!ok) { + write_state_ = StreamState::Error; + SPDLOG_WARN("Previous telemetry write to {} failed", peer_address_); + } else { + write_state_ = StreamState::Closed; + SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_); + } + state_cv_.SignalAll(); } void TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) { @@ -315,7 +508,7 @@ void TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) { absl::MutexLock lk(&writes_mtx_); writes_.emplace_back(command); } - fireWrite(); + tryWriteNext(); } /// Notifies the application that all operations associated with this RPC @@ -334,9 +527,18 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& status) { { SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_); - absl::MutexLock lk(&stream_state_mtx_); - stream_state_ = StreamState::Closed; - stream_state_cv_.SignalAll(); + absl::MutexLock lk(&state_mtx_); + if (StreamState::Error != read_state_) { + SPDLOG_DEBUG("Change read-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closed)); + read_state_ = StreamState::Closed; + } + if (StreamState::Error != write_state_) { + SPDLOG_DEBUG("Change write-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Closed)); + write_state_ = StreamState::Closed; + } + state_cv_.SignalAll(); } auto client = client_.lock(); @@ -349,4 +551,20 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& status) { } } -ROCKETMQ_NAMESPACE_END \ No newline at end of file +void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) { + SPDLOG_DEBUG("{}#OnReadInitialMetadataDone", peer_address_); + + if (!ok) { + absl::MutexLock lk(&state_mtx_); + SPDLOG_DEBUG("Change write-state {} --> {}", static_cast<std::uint8_t>(read_state_), + static_cast<std::uint8_t>(StreamState::Error)); + read_state_ = StreamState::Error; + state_cv_.SignalAll(); + SPDLOG_WARN("Read of initial-metadata failed from {}", peer_address_); + return; + } + + SPDLOG_DEBUG("Received initial metadata from {}", peer_address_); +} + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/client/include/ClientManager.h b/cpp/source/client/include/ClientManager.h index 56325fa4..02b232b2 100644 --- a/cpp/source/client/include/ClientManager.h +++ b/cpp/source/client/include/ClientManager.h @@ -22,14 +22,12 @@ #include <system_error> #include "Client.h" -#include "MessageExt.h" #include "Metadata.h" #include "ReceiveMessageCallback.h" #include "RpcClient.h" #include "Scheduler.h" -#include "TelemetryBidiReactor.h" +#include "SendResultCallback.h" #include "TopicRouteData.h" -#include "rocketmq/SendCallback.h" #include "rocketmq/State.h" ROCKETMQ_NAMESPACE_BEGIN @@ -93,8 +91,10 @@ public: virtual void receiveMessage(const std::string& target, const Metadata& metadata, const ReceiveMessageRequest& request, std::chrono::milliseconds timeout, ReceiveMessageCallback callback) = 0; - virtual bool send(const std::string& target_host, const Metadata& metadata, SendMessageRequest& request, - SendCallback cb) = 0; + virtual bool send(const std::string& target_host, + const Metadata& metadata, + SendMessageRequest& request, + SendResultCallback cb) = 0; virtual std::error_code notifyClientTermination(const std::string& target_host, const Metadata& metadata, const NotifyClientTerminationRequest& request, diff --git a/cpp/source/client/include/ClientManagerImpl.h b/cpp/source/client/include/ClientManagerImpl.h index 653fcad3..5f1b27ca 100644 --- a/cpp/source/client/include/ClientManagerImpl.h +++ b/cpp/source/client/include/ClientManagerImpl.h @@ -20,7 +20,6 @@ #include <chrono> #include <cstdint> #include <functional> -#include <future> #include <memory> #include <string> #include <system_error> @@ -29,18 +28,13 @@ #include "Client.h" #include "ClientManager.h" #include "InsecureCertificateVerifier.h" -#include "InvocationContext.h" #include "ReceiveMessageCallback.h" #include "RpcClientImpl.h" -#include "SchedulerImpl.h" -#include "SendMessageContext.h" -#include "TelemetryBidiReactor.h" #include "ThreadPoolImpl.h" #include "TopicRouteData.h" #include "absl/base/thread_annotations.h" #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" -#include "absl/strings/string_view.h" #include "absl/synchronization/mutex.h" #include "rocketmq/State.h" @@ -54,7 +48,7 @@ public: * effectively. * @param resource_namespace Abstract resource namespace, in which this client manager lives. */ - explicit ClientManagerImpl(std::string resource_namespace, bool withSsl = true); + explicit ClientManagerImpl(std::string resource_namespace, bool with_ssl = true); ~ClientManagerImpl() override; @@ -89,7 +83,7 @@ public: bool send(const std::string& target_host, const Metadata& metadata, SendMessageRequest& request, - SendCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_); + SendResultCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_); /** * Get a RpcClient according to the given target hosts, which follows scheme specified @@ -105,7 +99,7 @@ public: RpcClientSharedPtr getRpcClient(const std::string& target_host, bool need_heartbeat = true) override LOCKS_EXCLUDED(rpc_clients_mtx_); - static SendReceipt processSendResponse(const rmq::MessageQueue& message_queue, + static SendResult processSendResponse(const rmq::MessageQueue& message_queue, const SendMessageResponse& response, std::error_code& ec); @@ -242,7 +236,7 @@ private: grpc::ChannelArguments channel_arguments_; bool trace_{false}; - bool withSsl_; + bool with_ssl_; }; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/include/rocketmq/SendReceipt.h b/cpp/source/client/include/SendResult.h similarity index 88% copy from cpp/include/rocketmq/SendReceipt.h copy to cpp/source/client/include/SendResult.h index 489df5ec..3596f61f 100644 --- a/cpp/include/rocketmq/SendReceipt.h +++ b/cpp/source/client/include/SendResult.h @@ -16,20 +16,18 @@ */ #pragma once -#include <cstdint> -#include <string> -#include <utility> +#include <system_error> -#include "RocketMQ.h" +#include "rocketmq/RocketMQ.h" ROCKETMQ_NAMESPACE_BEGIN -struct SendReceipt { - std::string message_id; +struct SendResult { + std::error_code ec; + std::string target; + std::string message_id; std::string transaction_id; - - std::string target; }; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/include/rocketmq/SendReceipt.h b/cpp/source/client/include/SendResultCallback.h similarity index 80% copy from cpp/include/rocketmq/SendReceipt.h copy to cpp/source/client/include/SendResultCallback.h index 489df5ec..182f649a 100644 --- a/cpp/include/rocketmq/SendReceipt.h +++ b/cpp/source/client/include/SendResultCallback.h @@ -16,20 +16,12 @@ */ #pragma once -#include <cstdint> -#include <string> -#include <utility> +#include <functional> -#include "RocketMQ.h" +#include "SendResult.h" ROCKETMQ_NAMESPACE_BEGIN -struct SendReceipt { - std::string message_id; - - std::string transaction_id; - - std::string target; -}; +using SendResultCallback = std::function<void(const SendResult&)>; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/client/include/TelemetryBidiReactor.h b/cpp/source/client/include/TelemetryBidiReactor.h index 9fe65f31..3bdbe3d3 100644 --- a/cpp/source/client/include/TelemetryBidiReactor.h +++ b/cpp/source/client/include/TelemetryBidiReactor.h @@ -19,6 +19,7 @@ #include <atomic> #include <chrono> #include <cstdint> +#include <list> #include <memory> #include <utility> #include <vector> @@ -34,12 +35,29 @@ ROCKETMQ_NAMESPACE_BEGIN enum class StreamState : std::uint8_t { Created = 0, - Active = 1, - ReadDone = 2, - WriteDone = 3, + Ready = 1, + Inflight = 2, + Closing = 3, Closed = 4, + Error = 5, }; +/// write-stream-state: created --> ready --> inflight --> ready --> ... +/// --> error +/// --> closing --> closed +/// --> closing --> closed +/// --> error +/// +/// +/// read-stream-state: created --> ready --> inflight --> inflight +/// --> closing --> closed +/// --> error +/// --> closed +/// requirement: +/// 1, fireClose --> blocking await till bidireactor is closed; +/// 2, when session is closed and client is still active, recreate a new session to accept incoming commands from +/// server 3, after writing the first Settings telemetry command, launch the read directional stream +/// class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, TelemetryCommand>, public std::enable_shared_from_this<TelemetryBidiReactor> { public: @@ -47,24 +65,54 @@ public: ~TelemetryBidiReactor(); - void OnWriteDone(bool ok) override; - - void OnWritesDoneDone(bool ok) override; - - void OnReadDone(bool ok) override; - + /// Notifies the application that all operations associated with this RPC + /// have completed and all Holds have been removed. OnDone provides the RPC + /// status outcome for both successful and failed RPCs and will be called in + /// all cases. If it is not called, it indicates an application-level problem + /// (like failure to remove a hold). + /// + /// \param[in] s The status outcome of this RPC void OnDone(const grpc::Status& status) override; - void fireRead(); + /// Notifies the application that a read of initial metadata from the + /// server is done. If the application chooses not to implement this method, + /// it can assume that the initial metadata has been read before the first + /// call of OnReadDone or OnDone. + /// + /// \param[in] ok Was the initial metadata read successfully? If false, no + /// new read/write operation will succeed, and any further + /// Start* operations should not be called. + void OnReadInitialMetadataDone(bool /*ok*/) override; + + /// Notifies the application that a StartRead operation completed. + /// + /// \param[in] ok Was it successful? If false, no new read/write operation + /// will succeed, and any further Start* should not be called. + void OnReadDone(bool ok) override; - void fireWrite(); + /// Notifies the application that a StartWrite or StartWriteLast operation + /// completed. + /// + /// \param[in] ok Was it successful? If false, no new read/write operation + /// will succeed, and any further Start* should not be called. + void OnWriteDone(bool ok) override; - void fireClose(); + /// Notifies the application that a StartWritesDone operation completed. Note + /// that this is only used on explicit StartWritesDone operations and not for + /// those that are implicitly invoked as part of a StartWriteLast. + /// + /// \param[in] ok Was it successful? If false, the application will later see + /// the failure reflected as a bad status in OnDone and no + /// further Start* should be called. + void OnWritesDoneDone(bool ok) override; + /// Core API method to initiate this bidirectional stream. void write(TelemetryCommand command); bool await(); + void fireClose(); + private: grpc::ClientContext context_; @@ -75,17 +123,15 @@ private: /** * @brief Buffered commands to write to server + * + * TODO: move buffered commands to a shared container, which may survive + * multiple TelemetryBidiReactor lifecycles. */ - std::vector<TelemetryCommand> writes_ GUARDED_BY(writes_mtx_); + std::list<TelemetryCommand> writes_ GUARDED_BY(writes_mtx_); absl::Mutex writes_mtx_; /** - * @brief The command that is currently being written back to server. - */ - TelemetryCommand write_; - - /** - * @brief Each TelemetryBidiReactor belongs to a specific client as its owner. + * @brief Each TelemetryBidiReactor belongs to a specific client as its owner. */ std::weak_ptr<Client> client_; @@ -94,18 +140,12 @@ private: */ std::string peer_address_; - /** - * @brief Indicate if there is a command being written to network. - */ - std::atomic_bool command_inflight_{false}; - - StreamState stream_state_ GUARDED_BY(stream_state_mtx_); - absl::Mutex stream_state_mtx_; - absl::CondVar stream_state_cv_; + StreamState read_state_ GUARDED_BY(state_mtx_); + StreamState write_state_ GUARDED_BY(state_mtx_); + absl::Mutex state_mtx_; + absl::CondVar state_cv_; - bool server_setting_received_ GUARDED_BY(server_setting_received_mtx_){false}; - absl::Mutex server_setting_received_mtx_; - absl::CondVar server_setting_received_cv_; + void changeStreamStateThenNotify(StreamState state); void onVerifyMessageResult(TelemetryCommand command); @@ -116,6 +156,14 @@ private: void applyPublishingConfig(const rmq::Settings& settings, std::shared_ptr<Client> client); void applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr<Client> client); + + /// Start the read stream. + /// + /// Once got the OnReadDone and status is OK, call StartRead immediately. + void fireRead(); + + /// Attempt to write pending telemetry command to server. + void tryWriteNext() LOCKS_EXCLUDED(state_mtx_, writes_mtx_); }; -ROCKETMQ_NAMESPACE_END \ No newline at end of file +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/exports.map b/cpp/source/exports.map new file mode 100644 index 00000000..99ae5d1d --- /dev/null +++ b/cpp/source/exports.map @@ -0,0 +1,6 @@ +{ + global: + *rocketmq*; + local: + *; +}; \ No newline at end of file diff --git a/cpp/include/rocketmq/SendReceipt.h b/cpp/source/rocketmq/FifoContext.cpp similarity index 70% copy from cpp/include/rocketmq/SendReceipt.h copy to cpp/source/rocketmq/FifoContext.cpp index 489df5ec..1cf5bab2 100644 --- a/cpp/include/rocketmq/SendReceipt.h +++ b/cpp/source/rocketmq/FifoContext.cpp @@ -14,22 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#pragma once +#include "FifoContext.h" -#include <cstdint> -#include <string> -#include <utility> - -#include "RocketMQ.h" +#include "rocketmq/RocketMQ.h" ROCKETMQ_NAMESPACE_BEGIN -struct SendReceipt { - std::string message_id; - - std::string transaction_id; +FifoContext::FifoContext(MessageConstPtr message, SendCallback callback) + : message(std::move(message)), callback(callback) { +} - std::string target; -}; +FifoContext::FifoContext(FifoContext&& rhs) noexcept { + this->message = std::move(rhs.message); + this->callback = rhs.callback; +} ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/rocketmq/FifoProducer.cpp b/cpp/source/rocketmq/FifoProducer.cpp new file mode 100644 index 00000000..b52630b0 --- /dev/null +++ b/cpp/source/rocketmq/FifoProducer.cpp @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "rocketmq/FifoProducer.h" + +#include <cstddef> +#include <memory> + +#include "FifoProducerImpl.h" +#include "ProducerImpl.h" +#include "StaticNameServerResolver.h" +#include "rocketmq/Configuration.h" +#include "rocketmq/Message.h" +#include "rocketmq/RocketMQ.h" +#include "rocketmq/SendCallback.h" + +ROCKETMQ_NAMESPACE_BEGIN + +FifoProducerBuilder FifoProducer::newBuilder() { + return {}; +} + +FifoProducerBuilder::FifoProducerBuilder() : producer_impl_(std::make_shared<ProducerImpl>()) { +} + +FifoProducerBuilder& FifoProducerBuilder::withConfiguration(Configuration configuration) { + auto name_server_resolver = std::make_shared<StaticNameServerResolver>(configuration.endpoints()); + producer_impl_->withNameServerResolver(std::move(name_server_resolver)); + producer_impl_->withCredentialsProvider(configuration.credentialsProvider()); + producer_impl_->withRequestTimeout(configuration.requestTimeout()); + producer_impl_->withSsl(configuration.withSsl()); + return *this; +} + +FifoProducerBuilder& FifoProducerBuilder::withTopics(const std::vector<std::string>& topics) { + producer_impl_->withTopics(topics); + return *this; +} + +FifoProducerBuilder& FifoProducerBuilder::withConcurrency(std::size_t concurrency) { + this->impl_ = std::make_shared<FifoProducerImpl>(producer_impl_, concurrency); + return *this; +} + +FifoProducer FifoProducerBuilder::build() { + FifoProducer fifo_producer(this->impl_); + fifo_producer.start(); + return fifo_producer; +} + +void FifoProducer::start() { + impl_->internalProducer()->start(); +} + +void FifoProducer::send(MessageConstPtr message, SendCallback callback) { + impl_->send(std::move(message), callback); +} + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/include/rocketmq/SendReceipt.h b/cpp/source/rocketmq/FifoProducerImpl.cpp similarity index 64% copy from cpp/include/rocketmq/SendReceipt.h copy to cpp/source/rocketmq/FifoProducerImpl.cpp index 489df5ec..ad08c4d9 100644 --- a/cpp/include/rocketmq/SendReceipt.h +++ b/cpp/source/rocketmq/FifoProducerImpl.cpp @@ -14,22 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#pragma once +#include "FifoProducerImpl.h" -#include <cstdint> -#include <string> #include <utility> -#include "RocketMQ.h" +#include "FifoContext.h" +#include "rocketmq/Message.h" +#include "rocketmq/RocketMQ.h" +#include "rocketmq/SendCallback.h" ROCKETMQ_NAMESPACE_BEGIN -struct SendReceipt { - std::string message_id; +void FifoProducerImpl::send(MessageConstPtr message, SendCallback callback) { + auto& group = message->group(); + std::size_t hash = hash_fn_(group); + std::size_t slot = hash % concurrency_; - std::string transaction_id; - - std::string target; -}; + FifoContext context(std::move(message), callback); + partitions_[slot]->add(std::move(context)); +} ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp b/cpp/source/rocketmq/FifoProducerPartition.cpp new file mode 100644 index 00000000..8a2f06ff --- /dev/null +++ b/cpp/source/rocketmq/FifoProducerPartition.cpp @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "FifoProducerPartition.h" + +#include "absl/synchronization/mutex.h" + +#include <atomic> +#include <memory> +#include <system_error> + +#include "FifoContext.h" +#include "rocketmq/Message.h" +#include "rocketmq/RocketMQ.h" +#include "rocketmq/SendCallback.h" +#include "rocketmq/SendReceipt.h" +#include "spdlog/spdlog.h" + +ROCKETMQ_NAMESPACE_BEGIN + +void FifoProducerPartition::add(FifoContext&& context) { + { + absl::MutexLock lk(&messages_mtx_); + messages_.emplace_back(std::move(context)); + SPDLOG_DEBUG("{} has {} pending messages after #add", name_, messages_.size()); + } + + trySend(); +} + +void FifoProducerPartition::trySend() { + bool expected = false; + if (inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) { + absl::MutexLock lk(&messages_mtx_); + + if (messages_.empty()) { + SPDLOG_DEBUG("There is no more messages to send"); + return; + } + + FifoContext& ctx = messages_.front(); + MessageConstPtr message = std::move(ctx.message); + SendCallback send_callback = ctx.callback; + + std::shared_ptr<FifoProducerPartition> partition = shared_from_this(); + auto fifo_callback = [=](const std::error_code& ec, const SendReceipt& receipt) mutable { + partition->onComplete(ec, receipt, send_callback); + }; + SPDLOG_DEBUG("Sending FIFO message from {}", name_); + producer_->send(std::move(message), fifo_callback); + messages_.pop_front(); + SPDLOG_DEBUG("In addition to the inflight one, there is {} messages pending in {}", messages_.size(), name_); + } else { + SPDLOG_DEBUG("There is an inflight message"); + } +} + +void FifoProducerPartition::onComplete(const std::error_code& ec, const SendReceipt& receipt, SendCallback& callback) { + if (ec) { + SPDLOG_INFO("{} completed with a failure: {}", name_, ec.message()); + } else { + SPDLOG_DEBUG("{} completed OK", name_); + } + + if (!ec) { + callback(ec, receipt); + // update inflight status + bool expected = true; + if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) { + trySend(); + } else { + SPDLOG_ERROR("{}: Unexpected inflight status", name_); + } + return; + } + + // Put the message back to the front of the list + SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt); + FifoContext retry_context(std::move(receipt_mut.message), callback); + { + absl::MutexLock lk(&messages_mtx_); + messages_.emplace_front(std::move(retry_context)); + } + + // Update inflight status + bool expected = true; + if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) { + trySend(); + } else { + SPDLOG_ERROR("Unexpected inflight status"); + } +} + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp index 78d812ed..907d0a28 100644 --- a/cpp/source/rocketmq/Producer.cpp +++ b/cpp/source/rocketmq/Producer.cpp @@ -21,12 +21,8 @@ #include <system_error> #include <utility> -#include "rocketmq/Logger.h" -#include "spdlog/spdlog.h" -#include "MixAll.h" #include "ProducerImpl.h" #include "StaticNameServerResolver.h" -#include "absl/strings/str_split.h" #include "rocketmq/ErrorCode.h" #include "rocketmq/SendReceipt.h" #include "rocketmq/Transaction.h" diff --git a/cpp/source/rocketmq/ProducerImpl.cpp b/cpp/source/rocketmq/ProducerImpl.cpp index 73130161..34c5b29c 100644 --- a/cpp/source/rocketmq/ProducerImpl.cpp +++ b/cpp/source/rocketmq/ProducerImpl.cpp @@ -17,38 +17,27 @@ #include "ProducerImpl.h" #include <algorithm> -#include <apache/rocketmq/v2/definition.pb.h> - #include <atomic> #include <cassert> #include <chrono> -#include <limits> #include <memory> #include <system_error> #include <utility> -#include "Client.h" -#include "MessageGroupQueueSelector.h" -#include "MetadataConstants.h" +#include "apache/rocketmq/v2/definition.pb.h" #include "MixAll.h" #include "Protocol.h" #include "PublishInfoCallback.h" -#include "RpcClient.h" #include "SendContext.h" -#include "SendMessageContext.h" #include "Signature.h" -#include "Tag.h" #include "TracingUtility.h" #include "TransactionImpl.h" -#include "UniqueIdGenerator.h" #include "UtilAll.h" -#include "absl/strings/str_join.h" #include "opencensus/trace/propagation/trace_context.h" #include "opencensus/trace/span.h" #include "rocketmq/ErrorCode.h" #include "rocketmq/Message.h" #include "rocketmq/SendReceipt.h" -#include "rocketmq/Tracing.h" #include "rocketmq/Transaction.h" #include "rocketmq/TransactionChecker.h" @@ -203,19 +192,30 @@ void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageReq SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) noexcept { ensureRunning(ec); if (ec) { - return {}; + SPDLOG_WARN("Producer is not running"); + SendReceipt send_receipt{}; + send_receipt.message = std::move(message); + return send_receipt; } auto topic_publish_info = getPublishInfo(message->topic()); if (!topic_publish_info) { + SPDLOG_WARN("Route of topic[{}] is not found", message->topic()); ec = ErrorCode::NotFound; - return {}; + SendReceipt send_receipt{}; + send_receipt.message = std::move(message); + return send_receipt; } std::vector<rmq::MessageQueue> message_queue_list; - if (!topic_publish_info->selectMessageQueues(absl::make_optional<std::string>(), message_queue_list)) { + // null_opt_t + absl::optional<std::string> message_group{}; + if (!topic_publish_info->selectMessageQueues(message_group, message_queue_list)) { + SPDLOG_WARN("Failed to select an addressable message queue for topic[{}]", message->topic()); ec = ErrorCode::NotFound; - return {}; + SendReceipt send_receipt{}; + send_receipt.message = std::move(message); + return send_receipt; } auto mtx = std::make_shared<absl::Mutex>(); @@ -224,9 +224,10 @@ SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) noe SendReceipt send_receipt; // Define callback - auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& receipt) { + auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& receipt) mutable { ec = code; - send_receipt = receipt; + SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt); + send_receipt.message = std::move(receipt_mut.message); { absl::MutexLock lk(mtx.get()); completed = true; @@ -251,6 +252,7 @@ void ProducerImpl::send(MessageConstPtr message, SendCallback cb) { ensureRunning(ec); if (ec) { SendReceipt send_receipt; + send_receipt.message = std::move(message); cb(ec, send_receipt); } @@ -264,6 +266,7 @@ void ProducerImpl::send(MessageConstPtr message, SendCallback cb) { // No route entries of the given topic is available if (ec) { SendReceipt send_receipt; + send_receipt.message = std::move(ptr); cb(ec, send_receipt); return; } @@ -271,6 +274,7 @@ void ProducerImpl::send(MessageConstPtr message, SendCallback cb) { if (!publish_info) { std::error_code ec = ErrorCode::NotFound; SendReceipt send_receipt; + send_receipt.message = std::move(ptr); cb(ec, send_receipt); return; } @@ -280,6 +284,7 @@ void ProducerImpl::send(MessageConstPtr message, SendCallback cb) { if (!publish_info->selectMessageQueues(ptr->group(), message_queue_list)) { std::error_code ec = ErrorCode::NotFound; SendReceipt send_receipt; + send_receipt.message = std::move(ptr); cb(ec, send_receipt); return; } @@ -338,12 +343,12 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) { Metadata metadata; Signature::sign(client_config_, metadata); - auto callback = [context](const std::error_code& ec, const SendReceipt& send_receipt) { - if (ec) { - context->onFailure(ec); + auto callback = [context](const SendResult& send_result) { + if (send_result.ec) { + context->onFailure(send_result.ec); return; } - context->onSuccess(send_receipt); + context->onSuccess(send_result); }; client_manager_->send(target, metadata, request, callback); @@ -354,12 +359,14 @@ void ProducerImpl::send0(MessageConstPtr message, SendCallback callback, std::ve std::error_code ec; validate(*message, ec); if (ec) { + send_receipt.message = std::move(message); callback(ec, send_receipt); return; } if (list.empty()) { ec = ErrorCode::NotFound; + send_receipt.message = std::move(message); callback(ec, send_receipt); return; } diff --git a/cpp/source/rocketmq/SendContext.cpp b/cpp/source/rocketmq/SendContext.cpp index 385a1a99..bd97384d 100644 --- a/cpp/source/rocketmq/SendContext.cpp +++ b/cpp/source/rocketmq/SendContext.cpp @@ -21,41 +21,44 @@ #include "ProducerImpl.h" #include "PublishStats.h" #include "Tag.h" -#include "TransactionImpl.h" -#include "opencensus/trace/propagation/trace_context.h" #include "opencensus/trace/span.h" -#include "rocketmq/Logger.h" +#include "rocketmq/ErrorCode.h" #include "rocketmq/SendReceipt.h" #include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN -void SendContext::onSuccess(const SendReceipt& send_receipt) noexcept { +void SendContext::onSuccess(const SendResult& send_result) noexcept { { // Mark end of send-message span. span_.SetStatus(opencensus::trace::StatusCode::OK); span_.End(); } - auto publisher = producer_.lock(); - if (!publisher) { + auto producer = producer_.lock(); + if (!producer) { + SPDLOG_WARN("Producer has been destructed"); return; } // Collect metrics { auto duration = std::chrono::steady_clock::now() - request_time_; - opencensus::stats::Record({{publisher->stats().latency(), MixAll::millisecondsOf(duration)}}, + opencensus::stats::Record({{producer->stats().latency(), MixAll::millisecondsOf(duration)}}, { {Tag::topicTag(), message_->topic()}, - {Tag::clientIdTag(), publisher->config().client_id}, + {Tag::clientIdTag(), producer->config().client_id}, {Tag::invocationStatusTag(), "success"}, }); } // send_receipt.traceContext(opencensus::trace::propagation::ToTraceParentHeader(span_.context())); - std::error_code ec; - callback_(ec, send_receipt); + SendReceipt send_receipt = {}; + send_receipt.target = send_result.target; + send_receipt.message_id = send_result.message_id; + send_receipt.transaction_id = send_result.transaction_id; + send_receipt.message = std::move(message_); + callback_(send_result.ec, send_receipt); } void SendContext::onFailure(const std::error_code& ec) noexcept { @@ -65,38 +68,36 @@ void SendContext::onFailure(const std::error_code& ec) noexcept { span_.End(); } - auto publisher = producer_.lock(); - if (!publisher) { + auto producer = producer_.lock(); + if (!producer) { + SPDLOG_WARN("Producer has been destructed"); return; } // Collect metrics { auto duration = std::chrono::steady_clock::now() - request_time_; - opencensus::stats::Record({{publisher->stats().latency(), MixAll::millisecondsOf(duration)}}, + opencensus::stats::Record({{producer->stats().latency(), MixAll::millisecondsOf(duration)}}, { {Tag::topicTag(), message_->topic()}, - {Tag::clientIdTag(), publisher->config().client_id}, + {Tag::clientIdTag(), producer->config().client_id}, {Tag::invocationStatusTag(), "failure"}, }); } - if (++attempt_times_ >= publisher->maxAttemptTimes()) { - SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", attempt_times_, publisher->maxAttemptTimes()); - callback_(ec, {}); - return; - } - - std::shared_ptr<ProducerImpl> producer = producer_.lock(); - if (!producer) { - SPDLOG_WARN("Producer has been destructed"); - callback_(ec, {}); + if (++attempt_times_ >= producer->maxAttemptTimes()) { + SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", attempt_times_, producer->maxAttemptTimes()); + SendReceipt receipt{}; + receipt.message = std::move(message_); + callback_(ec, receipt); return; } if (candidates_.empty()) { SPDLOG_WARN("No alternative hosts to perform additional retries"); - callback_(ec, {}); + SendReceipt receipt{}; + receipt.message = std::move(message_); + callback_(ec, receipt); return; } @@ -106,7 +107,7 @@ void SendContext::onFailure(const std::error_code& ec) noexcept { auto ctx = shared_from_this(); // If publish message requests are throttled, retry after backoff if (ErrorCode::TooManyRequests == ec) { - auto&& backoff = publisher->backoff(attempt_times_); + auto&& backoff = producer->backoff(attempt_times_); SPDLOG_DEBUG("Publish message[topic={}, message-id={}] is throttled. Retry after {}ms", message_->topic(), message_->id(), MixAll::millisecondsOf(backoff)); auto retry_cb = [=]() { producer->sendImpl(ctx); }; diff --git a/cpp/source/rocketmq/include/ClientImpl.h b/cpp/source/rocketmq/include/ClientImpl.h index c266047a..d7693962 100644 --- a/cpp/source/rocketmq/include/ClientImpl.h +++ b/cpp/source/rocketmq/include/ClientImpl.h @@ -94,8 +94,8 @@ public: client_config_.request_timeout = absl::FromChrono(request_timeout); } - void withSsl(bool enable) { - client_config_.withSsl = enable; + void withSsl(bool with_ssl) { + client_config_.withSsl = with_ssl; } /** diff --git a/cpp/include/rocketmq/SendReceipt.h b/cpp/source/rocketmq/include/FifoContext.h similarity index 74% copy from cpp/include/rocketmq/SendReceipt.h copy to cpp/source/rocketmq/include/FifoContext.h index 489df5ec..2fc3492e 100644 --- a/cpp/include/rocketmq/SendReceipt.h +++ b/cpp/source/rocketmq/include/FifoContext.h @@ -16,20 +16,19 @@ */ #pragma once -#include <cstdint> -#include <string> -#include <utility> - -#include "RocketMQ.h" +#include "rocketmq/Message.h" +#include "rocketmq/RocketMQ.h" +#include "rocketmq/SendCallback.h" ROCKETMQ_NAMESPACE_BEGIN -struct SendReceipt { - std::string message_id; +struct FifoContext { + MessageConstPtr message; + SendCallback callback; - std::string transaction_id; + FifoContext(MessageConstPtr message, SendCallback callback); - std::string target; + FifoContext(FifoContext&& rhs) noexcept; }; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h b/cpp/source/rocketmq/include/FifoProducerImpl.h new file mode 100644 index 00000000..ffc9c8a6 --- /dev/null +++ b/cpp/source/rocketmq/include/FifoProducerImpl.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <cstddef> +#include <memory> +#include <vector> + +#include "FifoProducerPartition.h" +#include "ProducerImpl.h" +#include "fmt/format.h" +#include "rocketmq/Message.h" +#include "rocketmq/SendCallback.h" + +ROCKETMQ_NAMESPACE_BEGIN + +class FifoProducerImpl : std::enable_shared_from_this<FifoProducerImpl> { +public: + FifoProducerImpl(std::shared_ptr<ProducerImpl> producer, std::size_t concurrency) + : producer_(producer), concurrency_(concurrency), partitions_(concurrency) { + for (auto i = 0; i < concurrency; i++) { + partitions_[i] = std::make_shared<FifoProducerPartition>(producer_, fmt::format("slot-{}", i)); + } + }; + + void send(MessageConstPtr message, SendCallback callback); + + std::shared_ptr<ProducerImpl>& internalProducer() { + return producer_; + } + +private: + std::shared_ptr<ProducerImpl> producer_; + std::vector<std::shared_ptr<FifoProducerPartition>> partitions_; + std::size_t concurrency_; + std::hash<std::string> hash_fn_; +}; + +ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/rocketmq/include/FifoProducerPartition.h b/cpp/source/rocketmq/include/FifoProducerPartition.h new file mode 100644 index 00000000..8d0e00d8 --- /dev/null +++ b/cpp/source/rocketmq/include/FifoProducerPartition.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "absl/base/internal/thread_annotations.h" + +#include <atomic> +#include <list> +#include <memory> +#include <system_error> + +#include "FifoContext.h" +#include "ProducerImpl.h" +#include "absl/base/thread_annotations.h" +#include "absl/synchronization/mutex.h" +#include "rocketmq/SendCallback.h" +#include "rocketmq/SendReceipt.h" + +ROCKETMQ_NAMESPACE_BEGIN + +class FifoProducerPartition : public std::enable_shared_from_this<FifoProducerPartition> { +public: + FifoProducerPartition(std::shared_ptr<ProducerImpl> producer, std::string&& name) + : producer_(producer), name_(std::move(name)) { + } + + void add(FifoContext&& context) LOCKS_EXCLUDED(messages_mtx_); + + void trySend() LOCKS_EXCLUDED(messages_mtx_); + + void onComplete(const std::error_code& ec, const SendReceipt& receipt, SendCallback& callback); + +private: + std::shared_ptr<ProducerImpl> producer_; + std::list<FifoContext> messages_ GUARDED_BY(messages_mtx_); + absl::Mutex messages_mtx_; + std::atomic_bool inflight_{false}; + std::string name_; +}; + +ROCKETMQ_NAMESPACE_END diff --git a/cpp/source/rocketmq/include/ProducerImpl.h b/cpp/source/rocketmq/include/ProducerImpl.h index d7260a93..b572f20d 100644 --- a/cpp/source/rocketmq/include/ProducerImpl.h +++ b/cpp/source/rocketmq/include/ProducerImpl.h @@ -16,28 +16,23 @@ */ #pragma once -#include <chrono> #include <memory> -#include <mutex> #include <string> #include <system_error> #include "ClientImpl.h" -#include "ClientManagerImpl.h" #include "MixAll.h" #include "PublishInfoCallback.h" +#include "PublishStats.h" #include "SendContext.h" #include "TopicPublishInfo.h" #include "TransactionImpl.h" #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" -#include "absl/strings/string_view.h" #include "rocketmq/Message.h" #include "rocketmq/SendCallback.h" #include "rocketmq/SendReceipt.h" -#include "rocketmq/State.h" #include "rocketmq/TransactionChecker.h" -#include "PublishStats.h" ROCKETMQ_NAMESPACE_BEGIN @@ -53,8 +48,22 @@ public: void shutdown() override; + /** + * Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during + * sent. + * + * Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>, facilliating + * application to conduct customized retry policy. + */ SendReceipt send(MessageConstPtr message, std::error_code& ec) noexcept; + /** + * Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during + * sent. + * + * Regardless of the send result, SendReceipt would have the std::unique_ptr<const Message>, facilliating + * application to conduct customized retry policy. + */ void send(MessageConstPtr message, SendCallback callback); void setTransactionChecker(TransactionChecker checker); @@ -64,6 +73,13 @@ public: return absl::make_unique<TransactionImpl>(producer); } + /** + * Note we requrie application to transfer ownership of the message to send to avoid concurrent modification during + * sent. + * + * TODO: Refine this API. Current API is not good enough as it cannot handle the message back to its caller on publish + * failure. + */ void send(MessageConstPtr message, std::error_code& ec, Transaction& transaction); /** diff --git a/cpp/source/rocketmq/include/SendContext.h b/cpp/source/rocketmq/include/SendContext.h index 4c05cebe..4067532b 100644 --- a/cpp/source/rocketmq/include/SendContext.h +++ b/cpp/source/rocketmq/include/SendContext.h @@ -19,16 +19,12 @@ #include <memory> #include <system_error> -#include "absl/container/flat_hash_map.h" -#include "absl/synchronization/mutex.h" -#include "opencensus/trace/span.h" - #include "Protocol.h" +#include "SendResult.h" #include "TransactionImpl.h" -#include "rocketmq/ErrorCode.h" +#include "opencensus/trace/span.h" #include "rocketmq/Message.h" #include "rocketmq/SendCallback.h" -#include "rocketmq/SendReceipt.h" ROCKETMQ_NAMESPACE_BEGIN @@ -47,7 +43,7 @@ public: span_(opencensus::trace::Span::BlankSpan()) { } - void onSuccess(const SendReceipt& send_receipt) noexcept; + void onSuccess(const SendResult& send_result) noexcept; void onFailure(const std::error_code& ec) noexcept; diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h b/cpp/source/rocketmq/include/SimpleConsumerImpl.h index 45aa61b9..7fc63b6d 100644 --- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h +++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h @@ -25,7 +25,7 @@ using namespace std::chrono; ROCKETMQ_NAMESPACE_BEGIN -class SimpleConsumerImpl : public ClientImpl, public std::enable_shared_from_this<SimpleConsumerImpl> { +class SimpleConsumerImpl : virtual public ClientImpl, public std::enable_shared_from_this<SimpleConsumerImpl> { public: SimpleConsumerImpl(std::string group); diff --git a/cpp/source/rocketmq/tests/BUILD.bazel b/cpp/source/rocketmq/tests/BUILD.bazel index a8d10e92..74751c35 100644 --- a/cpp/source/rocketmq/tests/BUILD.bazel +++ b/cpp/source/rocketmq/tests/BUILD.bazel @@ -59,4 +59,12 @@ cc_test( "ConsumeMessageServiceTest.cpp", ], deps = base_deps, +) + +cc_test( + name = "optional_test", + srcs = [ + "OptionalTest.cpp", + ], + deps = base_deps ) \ No newline at end of file diff --git a/cpp/include/rocketmq/SendReceipt.h b/cpp/source/rocketmq/tests/OptionalTest.cpp similarity index 72% copy from cpp/include/rocketmq/SendReceipt.h copy to cpp/source/rocketmq/tests/OptionalTest.cpp index 489df5ec..1266a73e 100644 --- a/cpp/include/rocketmq/SendReceipt.h +++ b/cpp/source/rocketmq/tests/OptionalTest.cpp @@ -14,22 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#pragma once +#include "gtest/gtest.h" +#include "absl/types/optional.h" +#include "rocketmq/RocketMQ.h" -#include <cstdint> #include <string> -#include <utility> - -#include "RocketMQ.h" ROCKETMQ_NAMESPACE_BEGIN -struct SendReceipt { - std::string message_id; +TEST(OptionalTest, test_optional) { + absl::optional<std::string> opt{}; + ASSERT_EQ(false, opt.has_value()); - std::string transaction_id; + auto opt2 = absl::make_optional<std::string>(); + ASSERT_EQ(true, opt2.has_value()); +} - std::string target; -}; +ROCKETMQ_NAMESPACE_END -ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/stats/MetricBidiReactor.cpp b/cpp/source/stats/MetricBidiReactor.cpp index 22363bd0..e6921378 100644 --- a/cpp/source/stats/MetricBidiReactor.cpp +++ b/cpp/source/stats/MetricBidiReactor.cpp @@ -18,10 +18,10 @@ #include <chrono> -#include "rocketmq/Logger.h" -#include "spdlog/spdlog.h" #include "OpencensusExporter.h" #include "Signature.h" +#include "rocketmq/Logger.h" +#include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN @@ -43,12 +43,15 @@ MetricBidiReactor::MetricBidiReactor(std::weak_ptr<Client> client, std::weak_ptr return; } exporter_ptr->stub()->async()->Export(&context_, this); + AddHold(); StartCall(); } void MetricBidiReactor::OnReadDone(bool ok) { if (!ok) { SPDLOG_WARN("Failed to read response"); + // match the AddHold() call in MetricBidiReactor::fireRead + RemoveHold(); return; } SPDLOG_DEBUG("OnReadDone OK"); @@ -56,16 +59,32 @@ void MetricBidiReactor::OnReadDone(bool ok) { } void MetricBidiReactor::OnWriteDone(bool ok) { + { + bool expected = true; + if (!inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) { + SPDLOG_WARN("Illegal command-inflight state"); + return; + } + } + if (!ok) { SPDLOG_WARN("Failed to report metrics"); + // match AddHold() call in MetricBidiReactor::MetricBidiReactor + RemoveHold(); return; } SPDLOG_DEBUG("OnWriteDone OK"); + + // If the read stream has not started yet, start it now. fireRead(); - bool expected = true; - if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) { - fireWrite(); + + // Remove the one that been written. + { + absl::MutexLock lk(&requests_mtx_); + requests_.pop_front(); } + + tryWriteNext(); } void MetricBidiReactor::OnDone(const grpc::Status& s) { @@ -89,13 +108,13 @@ void MetricBidiReactor::write(ExportMetricsServiceRequest request) { SPDLOG_DEBUG("Append ExportMetricsServiceRequest to buffer"); { absl::MutexLock lk(&requests_mtx_); - requests_.emplace_back(std::move(request)); + requests_.push_back(std::move(request)); } - fireWrite(); + tryWriteNext(); } -void MetricBidiReactor::fireWrite() { +void MetricBidiReactor::tryWriteNext() { { absl::MutexLock lk(&requests_mtx_); if (requests_.empty()) { @@ -107,16 +126,15 @@ void MetricBidiReactor::fireWrite() { bool expected = false; if (inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) { absl::MutexLock lk(&requests_mtx_); - request_ = std::move(*requests_.begin()); - requests_.erase(requests_.begin()); SPDLOG_DEBUG("MetricBidiReactor#StartWrite"); - StartWrite(&request_); + StartWrite(&(requests_.front())); } } void MetricBidiReactor::fireRead() { bool expected = false; if (read_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) { + AddHold(); StartRead(&response_); } } diff --git a/cpp/source/stats/include/MetricBidiReactor.h b/cpp/source/stats/include/MetricBidiReactor.h index 0a10cd88..e4d75344 100644 --- a/cpp/source/stats/include/MetricBidiReactor.h +++ b/cpp/source/stats/include/MetricBidiReactor.h @@ -16,6 +16,8 @@ */ #pragma once +#include <list> + #include "Client.h" #include "grpcpp/grpcpp.h" #include "grpcpp/impl/codegen/client_callback.h" @@ -25,12 +27,17 @@ ROCKETMQ_NAMESPACE_BEGIN class OpencensusExporter; -using ExportMetricsServiceRequest = opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest; -using ExportMetricsServiceResponse = opencensus::proto::agent::metrics::v1::ExportMetricsServiceResponse; +using ExportMetricsServiceRequest = + opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest; +using ExportMetricsServiceResponse = + opencensus::proto::agent::metrics::v1::ExportMetricsServiceResponse; -class MetricBidiReactor : public grpc::ClientBidiReactor<ExportMetricsServiceRequest, ExportMetricsServiceResponse> { +class MetricBidiReactor + : public grpc::ClientBidiReactor<ExportMetricsServiceRequest, + ExportMetricsServiceResponse> { public: - MetricBidiReactor(std::weak_ptr<Client> client, std::weak_ptr<OpencensusExporter> exporter); + MetricBidiReactor(std::weak_ptr<Client> client, + std::weak_ptr<OpencensusExporter> exporter); /// Notifies the application that a StartRead operation completed. /// @@ -52,7 +59,7 @@ public: /// (like failure to remove a hold). /// /// \param[in] s The status outcome of this RPC - void OnDone(const grpc::Status& /*s*/) override; + void OnDone(const grpc::Status & /*s*/) override; void write(ExportMetricsServiceRequest request) LOCKS_EXCLUDED(requests_mtx_); @@ -61,9 +68,8 @@ private: std::weak_ptr<OpencensusExporter> exporter_; grpc::ClientContext context_; - ExportMetricsServiceRequest request_; - - std::vector<ExportMetricsServiceRequest> requests_ GUARDED_BY(requests_mtx_); + /// Pending ExportMetricsServiceRequest items to write to server + std::list<ExportMetricsServiceRequest> requests_ GUARDED_BY(requests_mtx_); absl::Mutex requests_mtx_; std::atomic_bool inflight_{false}; @@ -71,7 +77,7 @@ private: ExportMetricsServiceResponse response_; - void fireWrite(); + void tryWriteNext(); void fireRead(); }; diff --git a/cpp/tools/trouble_shooting.sh b/cpp/tools/gen_compile_commands.sh old mode 100644 new mode 100755 similarity index 83% copy from cpp/tools/trouble_shooting.sh copy to cpp/tools/gen_compile_commands.sh index 06d3795f..5ffc239c --- a/cpp/tools/trouble_shooting.sh +++ b/cpp/tools/gen_compile_commands.sh @@ -1,3 +1,4 @@ +#!/usr/bin/env bash # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -12,6 +13,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +TOOLS_DIR=$(dirname "$0") +WORKSPACE_DIR=$(dirname "$TOOLS_DIR") +cd $WORKSPACE_DIR -export GRPC_VERBOSITY=debug -export GRPC_TRACE=tcp,http,api \ No newline at end of file +bazel run @hedron_compile_commands//:refresh_all \ No newline at end of file diff --git a/cpp/tools/trouble_shooting.sh b/cpp/tools/trouble_shooting.sh old mode 100644 new mode 100755