This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new afd4f56d [ISSUE #928] Fix C++ push consumer handle error code and 
change demo log level (#932)
afd4f56d is described below

commit afd4f56d0d310852fbea836f6c1e8b5eec7bae2f
Author: lizhimins <707364...@qq.com>
AuthorDate: Thu Jan 23 11:41:11 2025 +0800

    [ISSUE #928] Fix C++ push consumer handle error code and change demo log 
level (#932)
---
 cpp/examples/ExampleFifoProducer.cpp                | 6 +++---
 cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp | 5 +++++
 cpp/source/rocketmq/SimpleConsumerImpl.cpp          | 4 ++--
 3 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/cpp/examples/ExampleFifoProducer.cpp 
b/cpp/examples/ExampleFifoProducer.cpp
index 1e7829d4..1876ebb1 100644
--- a/cpp/examples/ExampleFifoProducer.cpp
+++ b/cpp/examples/ExampleFifoProducer.cpp
@@ -105,8 +105,8 @@ int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
 
   auto& logger = getLogger();
-  logger.setConsoleLevel(Level::Debug);
-  logger.setLevel(Level::Debug);
+  logger.setConsoleLevel(Level::Info);
+  logger.setLevel(Level::Info);
   logger.init();
 
   // Access Key/Secret pair may be acquired from management console
@@ -172,7 +172,7 @@ int main(int argc, char* argv[]) {
 
       semaphore->acquire();
       producer.send(std::move(message), callback);
-      std::cout << "Cached No." << i << " message" << std::endl;
+      // std::cout << "Cached No." << i << " message" << std::endl;
     }
   } catch (...) {
     std::cerr << "Ah...No!!!" << std::endl;
diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp 
b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
index 1c96b095..f68c9f88 100644
--- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -51,6 +51,11 @@ void AsyncReceiveMessageCallback::onCompletion(const 
std::error_code& ec, const
     return;
   }
 
+  if (ec == ErrorCode::NoContent) {
+    checkThrottleThenReceive();
+    return;
+  }
+
   if (ec) {
     SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1 
second.", process_queue->simpleName(), ec.message());
     receiveMessageLater(std::chrono::seconds (1));
diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp 
b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
index df060793..e0a78eeb 100644
--- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
@@ -95,11 +95,11 @@ void SimpleConsumerImpl::start() {
       }
     };
 
-    // refer java sdk: set refresh interval to 30 seconds
+    // refer java sdk: set refresh interval to 5 seconds
     // org.apache.rocketmq.client.java.impl.ClientImpl#startUp
     refresh_assignment_task_ = manager()->getScheduler()->schedule(
         refresh_assignment_task, "RefreshAssignmentTask",
-        std::chrono::minutes(5), std::chrono::seconds(5));
+        std::chrono::seconds(5), std::chrono::seconds(5));
 
     client_manager_->addClientObserver(shared_from_this());
   }

Reply via email to