This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new afd4f56d [ISSUE #928] Fix C++ push consumer handle error code and change demo log level (#932) afd4f56d is described below commit afd4f56d0d310852fbea836f6c1e8b5eec7bae2f Author: lizhimins <707364...@qq.com> AuthorDate: Thu Jan 23 11:41:11 2025 +0800 [ISSUE #928] Fix C++ push consumer handle error code and change demo log level (#932) --- cpp/examples/ExampleFifoProducer.cpp | 6 +++--- cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp | 5 +++++ cpp/source/rocketmq/SimpleConsumerImpl.cpp | 4 ++-- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/cpp/examples/ExampleFifoProducer.cpp b/cpp/examples/ExampleFifoProducer.cpp index 1e7829d4..1876ebb1 100644 --- a/cpp/examples/ExampleFifoProducer.cpp +++ b/cpp/examples/ExampleFifoProducer.cpp @@ -105,8 +105,8 @@ int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); auto& logger = getLogger(); - logger.setConsoleLevel(Level::Debug); - logger.setLevel(Level::Debug); + logger.setConsoleLevel(Level::Info); + logger.setLevel(Level::Info); logger.init(); // Access Key/Secret pair may be acquired from management console @@ -172,7 +172,7 @@ int main(int argc, char* argv[]) { semaphore->acquire(); producer.send(std::move(message), callback); - std::cout << "Cached No." << i << " message" << std::endl; + // std::cout << "Cached No." << i << " message" << std::endl; } } catch (...) { std::cerr << "Ah...No!!!" << std::endl; diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp index 1c96b095..f68c9f88 100644 --- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp +++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp @@ -51,6 +51,11 @@ void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, const return; } + if (ec == ErrorCode::NoContent) { + checkThrottleThenReceive(); + return; + } + if (ec) { SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1 second.", process_queue->simpleName(), ec.message()); receiveMessageLater(std::chrono::seconds (1)); diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp b/cpp/source/rocketmq/SimpleConsumerImpl.cpp index df060793..e0a78eeb 100644 --- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp +++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp @@ -95,11 +95,11 @@ void SimpleConsumerImpl::start() { } }; - // refer java sdk: set refresh interval to 30 seconds + // refer java sdk: set refresh interval to 5 seconds // org.apache.rocketmq.client.java.impl.ClientImpl#startUp refresh_assignment_task_ = manager()->getScheduler()->schedule( refresh_assignment_task, "RefreshAssignmentTask", - std::chrono::minutes(5), std::chrono::seconds(5)); + std::chrono::seconds(5), std::chrono::seconds(5)); client_manager_->addClientObserver(shared_from_this()); }