This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new c38f7d55 fix #639 fix semantics of topicOfInterest (#642) c38f7d55 is described below commit c38f7d55cbceebb122c47f25a6cad72af210066c Author: Zhanhui Li <lizhan...@gmail.com> AuthorDate: Fri Dec 1 15:48:59 2023 +0800 fix #639 fix semantics of topicOfInterest (#642) Signed-off-by: Zhanhui Li <lizhan...@apache.org> --- cpp/.gitignore | 1 + cpp/source/rocketmq/Producer.cpp | 2 +- cpp/source/rocketmq/ProducerImpl.cpp | 18 ++++++++++++++++-- cpp/source/rocketmq/PushConsumerImpl.cpp | 2 +- cpp/source/rocketmq/SimpleConsumerImpl.cpp | 2 +- cpp/source/rocketmq/include/ClientImpl.h | 2 +- cpp/source/rocketmq/include/ProducerImpl.h | 4 +++- cpp/source/rocketmq/include/PushConsumerImpl.h | 2 +- cpp/source/rocketmq/include/SimpleConsumerImpl.h | 2 +- 9 files changed, 26 insertions(+), 9 deletions(-) diff --git a/cpp/.gitignore b/cpp/.gitignore index 23e0e933..b7f10c0f 100644 --- a/cpp/.gitignore +++ b/cpp/.gitignore @@ -18,3 +18,4 @@ bazel-rocketmq-client-cpp /bazel-* /compile_commands.json /.cache/ +.clangd diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp index 8620f681..78d812ed 100644 --- a/cpp/source/rocketmq/Producer.cpp +++ b/cpp/source/rocketmq/Producer.cpp @@ -88,7 +88,7 @@ ProducerBuilder& ProducerBuilder::withConfiguration(Configuration configuration) } ProducerBuilder& ProducerBuilder::withTopics(const std::vector<std::string>& topics) { - impl_->topicsOfInterest(topics); + impl_->withTopics(topics); return *this; } diff --git a/cpp/source/rocketmq/ProducerImpl.cpp b/cpp/source/rocketmq/ProducerImpl.cpp index 32b2ecad..73130161 100644 --- a/cpp/source/rocketmq/ProducerImpl.cpp +++ b/cpp/source/rocketmq/ProducerImpl.cpp @@ -16,6 +16,7 @@ */ #include "ProducerImpl.h" +#include <algorithm> #include <apache/rocketmq/v2/definition.pb.h> #include <atomic> @@ -575,9 +576,22 @@ void ProducerImpl::onOrphanedTransactionalMessage(MessageConstSharedPtr message) } } -void ProducerImpl::topicsOfInterest(std::vector<std::string> topics) { +void ProducerImpl::topicsOfInterest(std::vector<std::string> &topics) { absl::MutexLock lk(&topics_mtx_); - topics_.swap(topics); + for (auto& topic : topics_) { + if (std::find(topics.begin(), topics.end(), topic) == topics.end()) { + topics.push_back(topic); + } + } +} + +void ProducerImpl::withTopics(const std::vector<std::string> &topics) { + absl::MutexLock lk(&topics_mtx_); + for (auto &topic: topics) { + if (std::find(topics_.begin(), topics_.end(), topic) == topics_.end()) { + topics_.push_back(topic); + } + } } void ProducerImpl::buildClientSettings(rmq::Settings& settings) { diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp b/cpp/source/rocketmq/PushConsumerImpl.cpp index d73407b4..505854db 100644 --- a/cpp/source/rocketmq/PushConsumerImpl.cpp +++ b/cpp/source/rocketmq/PushConsumerImpl.cpp @@ -47,7 +47,7 @@ PushConsumerImpl::~PushConsumerImpl() { shutdown(); } -void PushConsumerImpl::topicsOfInterest(std::vector<std::string> topics) { +void PushConsumerImpl::topicsOfInterest(std::vector<std::string> &topics) { absl::MutexLock lk(&topic_filter_expression_table_mtx_); for (const auto& entry : topic_filter_expression_table_) { topics.push_back(entry.first); diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp b/cpp/source/rocketmq/SimpleConsumerImpl.cpp index 09acb7ab..7a1b3edf 100644 --- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp +++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp @@ -67,7 +67,7 @@ void SimpleConsumerImpl::buildClientSettings(rmq::Settings& settings) { } } -void SimpleConsumerImpl::topicsOfInterest(std::vector<std::string> topics) { +void SimpleConsumerImpl::topicsOfInterest(std::vector<std::string> &topics) { absl::MutexLock lk(&subscriptions_mtx_); for (const auto& entry : subscriptions_) { if (std::find(topics.begin(), topics.end(), entry.first) == topics.end()) { diff --git a/cpp/source/rocketmq/include/ClientImpl.h b/cpp/source/rocketmq/include/ClientImpl.h index 70dc5382..c266047a 100644 --- a/cpp/source/rocketmq/include/ClientImpl.h +++ b/cpp/source/rocketmq/include/ClientImpl.h @@ -167,7 +167,7 @@ protected: absl::flat_hash_map<std::string, std::unique_ptr<Session>> session_map_ GUARDED_BY(session_map_mtx_); absl::Mutex session_map_mtx_; - virtual void topicsOfInterest(std::vector<std::string> topics) { + virtual void topicsOfInterest(std::vector<std::string> &topics) { } void updateRouteInfo() LOCKS_EXCLUDED(topic_route_table_mtx_); diff --git a/cpp/source/rocketmq/include/ProducerImpl.h b/cpp/source/rocketmq/include/ProducerImpl.h index ad9b24d5..d7260a93 100644 --- a/cpp/source/rocketmq/include/ProducerImpl.h +++ b/cpp/source/rocketmq/include/ProducerImpl.h @@ -107,7 +107,9 @@ public: void buildClientSettings(rmq::Settings& settings) override; - void topicsOfInterest(std::vector<std::string> topics) override LOCKS_EXCLUDED(topics_mtx_); + void topicsOfInterest(std::vector<std::string> &topics) override LOCKS_EXCLUDED(topics_mtx_); + + void withTopics(const std::vector<std::string> &topics) LOCKS_EXCLUDED(topics_mtx_); const PublishStats& stats() const { return stats_; diff --git a/cpp/source/rocketmq/include/PushConsumerImpl.h b/cpp/source/rocketmq/include/PushConsumerImpl.h index d512f4c8..7a4ff1a3 100644 --- a/cpp/source/rocketmq/include/PushConsumerImpl.h +++ b/cpp/source/rocketmq/include/PushConsumerImpl.h @@ -52,7 +52,7 @@ public: void prepareHeartbeatData(HeartbeatRequest& request) override; - void topicsOfInterest(std::vector<std::string> topics) override LOCKS_EXCLUDED(topic_filter_expression_table_mtx_); + void topicsOfInterest(std::vector<std::string> &topics) override LOCKS_EXCLUDED(topic_filter_expression_table_mtx_); void start() override; diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h b/cpp/source/rocketmq/include/SimpleConsumerImpl.h index a20cce56..45aa61b9 100644 --- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h +++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h @@ -66,7 +66,7 @@ public: } protected: - void topicsOfInterest(std::vector<std::string> topics) override; + void topicsOfInterest(std::vector<std::string> &topics) override; private: absl::flat_hash_map<std::string, FilterExpression> subscriptions_ GUARDED_BY(subscriptions_mtx_);