This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new c38f7d55 fix #639 fix semantics of topicOfInterest (#642)
c38f7d55 is described below

commit c38f7d55cbceebb122c47f25a6cad72af210066c
Author: Zhanhui Li <lizhan...@gmail.com>
AuthorDate: Fri Dec 1 15:48:59 2023 +0800

    fix #639 fix semantics of topicOfInterest (#642)
    
    Signed-off-by: Zhanhui Li <lizhan...@apache.org>
---
 cpp/.gitignore                                   |  1 +
 cpp/source/rocketmq/Producer.cpp                 |  2 +-
 cpp/source/rocketmq/ProducerImpl.cpp             | 18 ++++++++++++++++--
 cpp/source/rocketmq/PushConsumerImpl.cpp         |  2 +-
 cpp/source/rocketmq/SimpleConsumerImpl.cpp       |  2 +-
 cpp/source/rocketmq/include/ClientImpl.h         |  2 +-
 cpp/source/rocketmq/include/ProducerImpl.h       |  4 +++-
 cpp/source/rocketmq/include/PushConsumerImpl.h   |  2 +-
 cpp/source/rocketmq/include/SimpleConsumerImpl.h |  2 +-
 9 files changed, 26 insertions(+), 9 deletions(-)

diff --git a/cpp/.gitignore b/cpp/.gitignore
index 23e0e933..b7f10c0f 100644
--- a/cpp/.gitignore
+++ b/cpp/.gitignore
@@ -18,3 +18,4 @@ bazel-rocketmq-client-cpp
 /bazel-*
 /compile_commands.json
 /.cache/
+.clangd
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 8620f681..78d812ed 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -88,7 +88,7 @@ ProducerBuilder& 
ProducerBuilder::withConfiguration(Configuration configuration)
 }
 
 ProducerBuilder& ProducerBuilder::withTopics(const std::vector<std::string>& 
topics) {
-  impl_->topicsOfInterest(topics);
+  impl_->withTopics(topics);
   return *this;
 }
 
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp 
b/cpp/source/rocketmq/ProducerImpl.cpp
index 32b2ecad..73130161 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -16,6 +16,7 @@
  */
 #include "ProducerImpl.h"
 
+#include <algorithm>
 #include <apache/rocketmq/v2/definition.pb.h>
 
 #include <atomic>
@@ -575,9 +576,22 @@ void 
ProducerImpl::onOrphanedTransactionalMessage(MessageConstSharedPtr message)
   }
 }
 
-void ProducerImpl::topicsOfInterest(std::vector<std::string> topics) {
+void ProducerImpl::topicsOfInterest(std::vector<std::string> &topics) {
   absl::MutexLock lk(&topics_mtx_);
-  topics_.swap(topics);
+  for (auto& topic : topics_) {
+    if (std::find(topics.begin(), topics.end(), topic) == topics.end()) {
+      topics.push_back(topic);
+    }
+  }
+}
+
+void ProducerImpl::withTopics(const std::vector<std::string> &topics) {
+  absl::MutexLock lk(&topics_mtx_);
+  for (auto &topic: topics) {
+    if (std::find(topics_.begin(), topics_.end(), topic) == topics_.end()) {
+      topics_.push_back(topic);
+    }
+  }
 }
 
 void ProducerImpl::buildClientSettings(rmq::Settings& settings) {
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp 
b/cpp/source/rocketmq/PushConsumerImpl.cpp
index d73407b4..505854db 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -47,7 +47,7 @@ PushConsumerImpl::~PushConsumerImpl() {
   shutdown();
 }
 
-void PushConsumerImpl::topicsOfInterest(std::vector<std::string> topics) {
+void PushConsumerImpl::topicsOfInterest(std::vector<std::string> &topics) {
   absl::MutexLock lk(&topic_filter_expression_table_mtx_);
   for (const auto& entry : topic_filter_expression_table_) {
     topics.push_back(entry.first);
diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp 
b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
index 09acb7ab..7a1b3edf 100644
--- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
@@ -67,7 +67,7 @@ void SimpleConsumerImpl::buildClientSettings(rmq::Settings& 
settings) {
   }
 }
 
-void SimpleConsumerImpl::topicsOfInterest(std::vector<std::string> topics) {
+void SimpleConsumerImpl::topicsOfInterest(std::vector<std::string> &topics) {
   absl::MutexLock lk(&subscriptions_mtx_);
   for (const auto& entry : subscriptions_) {
     if (std::find(topics.begin(), topics.end(), entry.first) == topics.end()) {
diff --git a/cpp/source/rocketmq/include/ClientImpl.h 
b/cpp/source/rocketmq/include/ClientImpl.h
index 70dc5382..c266047a 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -167,7 +167,7 @@ protected:
   absl::flat_hash_map<std::string, std::unique_ptr<Session>> session_map_ 
GUARDED_BY(session_map_mtx_);
   absl::Mutex session_map_mtx_;
 
-  virtual void topicsOfInterest(std::vector<std::string> topics) {
+  virtual void topicsOfInterest(std::vector<std::string> &topics) {
   }
 
   void updateRouteInfo() LOCKS_EXCLUDED(topic_route_table_mtx_);
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h 
b/cpp/source/rocketmq/include/ProducerImpl.h
index ad9b24d5..d7260a93 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -107,7 +107,9 @@ public:
 
   void buildClientSettings(rmq::Settings& settings) override;
 
-  void topicsOfInterest(std::vector<std::string> topics) override 
LOCKS_EXCLUDED(topics_mtx_);
+  void topicsOfInterest(std::vector<std::string> &topics) override 
LOCKS_EXCLUDED(topics_mtx_);
+
+  void withTopics(const std::vector<std::string> &topics) 
LOCKS_EXCLUDED(topics_mtx_);
 
   const PublishStats& stats() const {
     return stats_;
diff --git a/cpp/source/rocketmq/include/PushConsumerImpl.h 
b/cpp/source/rocketmq/include/PushConsumerImpl.h
index d512f4c8..7a4ff1a3 100644
--- a/cpp/source/rocketmq/include/PushConsumerImpl.h
+++ b/cpp/source/rocketmq/include/PushConsumerImpl.h
@@ -52,7 +52,7 @@ public:
 
   void prepareHeartbeatData(HeartbeatRequest& request) override;
 
-  void topicsOfInterest(std::vector<std::string> topics) override 
LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
+  void topicsOfInterest(std::vector<std::string> &topics) override 
LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
 
   void start() override;
 
diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h 
b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
index a20cce56..45aa61b9 100644
--- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
@@ -66,7 +66,7 @@ public:
   }
 
 protected:
-  void topicsOfInterest(std::vector<std::string> topics) override;
+  void topicsOfInterest(std::vector<std::string> &topics) override;
 
 private:
   absl::flat_hash_map<std::string, FilterExpression> subscriptions_ 
GUARDED_BY(subscriptions_mtx_);

Reply via email to