This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch fifo_opt
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/fifo_opt by this push:
     new 8a5704ca fix: log sending sending stages
8a5704ca is described below

commit 8a5704cafb46449587b9c06108640a0b00bd4942
Author: Li Zhanhui <lizhan...@gmail.com>
AuthorDate: Sun Apr 14 22:19:57 2024 +0800

    fix: log sending sending stages
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
---
 cpp/examples/ExampleFifoProducer.cpp                |  5 +++--
 cpp/source/rocketmq/FifoProducerPartition.cpp       | 18 +++++++++++++++++-
 cpp/source/rocketmq/include/FifoProducerImpl.h      |  3 ++-
 cpp/source/rocketmq/include/FifoProducerPartition.h |  4 +++-
 cpp/tools/trouble_shooting.sh                       |  0
 5 files changed, 25 insertions(+), 5 deletions(-)

diff --git a/cpp/examples/ExampleFifoProducer.cpp 
b/cpp/examples/ExampleFifoProducer.cpp
index ff3a14db..9d99be36 100644
--- a/cpp/examples/ExampleFifoProducer.cpp
+++ b/cpp/examples/ExampleFifoProducer.cpp
@@ -106,8 +106,8 @@ int main(int argc, char* argv[]) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
 
   auto& logger = getLogger();
-  logger.setConsoleLevel(Level::Info);
-  logger.setLevel(Level::Info);
+  logger.setConsoleLevel(Level::Debug);
+  logger.setLevel(Level::Debug);
   logger.init();
 
   // Access Key/Secret pair may be acquired from management console
@@ -173,6 +173,7 @@ int main(int argc, char* argv[]) {
 
       semaphore->acquire();
       producer.send(std::move(message), callback);
+      std::cout << "Cached No." << i << " message" << std::endl;
     }
   } catch (...) {
     std::cerr << "Ah...No!!!" << std::endl;
diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp 
b/cpp/source/rocketmq/FifoProducerPartition.cpp
index 37526f76..94e1c722 100644
--- a/cpp/source/rocketmq/FifoProducerPartition.cpp
+++ b/cpp/source/rocketmq/FifoProducerPartition.cpp
@@ -19,6 +19,7 @@ void FifoProducerPartition::add(FifoContext&& context) {
   {
     absl::MutexLock lk(&messages_mtx_);
     messages_.emplace_back(std::move(context));
+    SPDLOG_DEBUG("{} has {} pending messages after #add", name_, 
messages_.size());
   }
 
   trySend();
@@ -29,6 +30,11 @@ void FifoProducerPartition::trySend() {
   if (inflight_.compare_exchange_strong(expected, true, 
std::memory_order_relaxed)) {
     absl::MutexLock lk(&messages_mtx_);
 
+    if (messages_.empty()) {
+      SPDLOG_DEBUG("There is no more messages to send");
+      return;
+    }
+
     FifoContext& ctx = messages_.front();
     MessageConstPtr message = std::move(ctx.message);
     SendCallback send_callback = ctx.callback;
@@ -37,12 +43,22 @@ void FifoProducerPartition::trySend() {
     auto fifo_callback = [=](const std::error_code& ec, const SendReceipt& 
receipt) mutable {
       partition->onComplete(ec, receipt, send_callback);
     };
+    SPDLOG_DEBUG("Sending FIFO message from {}", name_);
     producer_->send(std::move(message), fifo_callback);
     messages_.pop_front();
+    SPDLOG_DEBUG("In addition to the inflight one, there is {} messages 
pending in {}", messages_.size(), name_);
+  } else {
+    SPDLOG_DEBUG("There is an inflight message");
   }
 }
 
 void FifoProducerPartition::onComplete(const std::error_code& ec, const 
SendReceipt& receipt, SendCallback& callback) {
+  if (ec) {
+    SPDLOG_INFO("{} completed with a failure: {}", name_, ec.message());
+  } else {
+    SPDLOG_DEBUG("{} completed OK", name_);
+  }
+
   if (!ec) {
     callback(ec, receipt);
     // update inflight status
@@ -50,7 +66,7 @@ void FifoProducerPartition::onComplete(const std::error_code& 
ec, const SendRece
     if (inflight_.compare_exchange_strong(expected, false, 
std::memory_order_relaxed)) {
       trySend();
     } else {
-      SPDLOG_ERROR("Unexpected inflight status");
+      SPDLOG_ERROR("{}: Unexpected inflight status", name_);
     }
     return;
   }
diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h 
b/cpp/source/rocketmq/include/FifoProducerImpl.h
index cc11dcf6..180c3f93 100644
--- a/cpp/source/rocketmq/include/FifoProducerImpl.h
+++ b/cpp/source/rocketmq/include/FifoProducerImpl.h
@@ -6,6 +6,7 @@
 
 #include "FifoProducerPartition.h"
 #include "ProducerImpl.h"
+#include "fmt/format.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/SendCallback.h"
 
@@ -16,7 +17,7 @@ public:
   FifoProducerImpl(std::shared_ptr<ProducerImpl> producer, std::size_t 
concurrency)
       : producer_(producer), concurrency_(concurrency), 
partitions_(concurrency) {
     for (auto i = 0; i < concurrency; i++) {
-      partitions_[i] = std::make_shared<FifoProducerPartition>(producer_);
+      partitions_[i] = std::make_shared<FifoProducerPartition>(producer_, 
fmt::format("slot-{}", i));
     }
   };
 
diff --git a/cpp/source/rocketmq/include/FifoProducerPartition.h 
b/cpp/source/rocketmq/include/FifoProducerPartition.h
index 406b8fa6..96bb96f6 100644
--- a/cpp/source/rocketmq/include/FifoProducerPartition.h
+++ b/cpp/source/rocketmq/include/FifoProducerPartition.h
@@ -18,7 +18,8 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 class FifoProducerPartition : public 
std::enable_shared_from_this<FifoProducerPartition> {
 public:
-  FifoProducerPartition(std::shared_ptr<ProducerImpl> producer) : 
producer_(producer) {
+  FifoProducerPartition(std::shared_ptr<ProducerImpl> producer, std::string&& 
name)
+      : producer_(producer), name_(std::move(name)) {
   }
 
   void add(FifoContext&& context) LOCKS_EXCLUDED(messages_mtx_);
@@ -32,6 +33,7 @@ private:
   std::list<FifoContext> messages_ GUARDED_BY(messages_mtx_);
   absl::Mutex messages_mtx_;
   std::atomic_bool inflight_{false};
+  std::string name_;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/tools/trouble_shooting.sh b/cpp/tools/trouble_shooting.sh
old mode 100644
new mode 100755

Reply via email to