This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new eeeb643d [ISSUE #928] [CPP] Fix some cpp client bug and make logs 
cleaner (#929)
eeeb643d is described below

commit eeeb643d8f74779ea38a111a78cdd4b7785f560a
Author: lizhimins <707364...@qq.com>
AuthorDate: Tue Jan 21 21:16:03 2025 +0800

    [ISSUE #928] [CPP] Fix some cpp client bug and make logs cleaner (#929)
---
 .github/workflows/cpp_build.yml                    |  2 +-
 cpp/.gitignore                                     |  1 +
 cpp/examples/ExampleFifoProducer.cpp               |  5 ++---
 cpp/examples/ExampleProducer.cpp                   |  4 ++--
 cpp/examples/ExampleProducerWithAsync.cpp          |  4 ++--
 cpp/examples/ExampleProducerWithFifoMessage.cpp    |  4 ++--
 cpp/examples/ExampleProducerWithTimedMessage.cpp   |  5 ++---
 .../ExampleProducerWithTransactionalMessage.cpp    |  4 ++--
 cpp/examples/ExamplePushConsumer.cpp               |  6 +++---
 cpp/examples/ExampleSimpleConsumer.cpp             |  6 +++---
 cpp/source/base/include/InvocationContext.h        |  4 ++--
 cpp/source/client/ClientManagerImpl.cpp            | 11 ++++++-----
 cpp/source/client/LogInterceptor.cpp               | 14 +++++++-------
 cpp/source/client/RpcClientImpl.cpp                |  2 --
 cpp/source/client/SessionImpl.cpp                  |  2 +-
 cpp/source/client/TelemetryBidiReactor.cpp         | 22 ++++++++++------------
 cpp/source/client/include/TopicRouteData.h         |  2 +-
 cpp/source/log/LoggerImpl.cpp                      |  3 ++-
 cpp/source/rocketmq/ClientImpl.cpp                 | 13 +++++--------
 cpp/source/rocketmq/ProducerImpl.cpp               |  8 ++++++--
 cpp/source/rocketmq/include/ClientImpl.h           |  1 +
 cpp/source/rocketmq/include/ProducerImpl.h         |  2 +-
 22 files changed, 62 insertions(+), 63 deletions(-)

diff --git a/.github/workflows/cpp_build.yml b/.github/workflows/cpp_build.yml
index 7973881a..5dc0302b 100644
--- a/.github/workflows/cpp_build.yml
+++ b/.github/workflows/cpp_build.yml
@@ -11,7 +11,7 @@ jobs:
         # Disable VS 2022 before 
https://github.com/bazelbuild/bazel/issues/18592 issue is solved
         # Remove macos-11 since there is no such runner available
         # os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019, 
windows-2022]
-        os: [ubuntu-20.04, ubuntu-22.04, macos-12, windows-2019]
+        os: [ubuntu-20.04, ubuntu-22.04, windows-2019]
     steps:
       - uses: actions/checkout@v2
       - name: Compile On Linux
diff --git a/cpp/.gitignore b/cpp/.gitignore
index b7f10c0f..551fa09a 100644
--- a/cpp/.gitignore
+++ b/cpp/.gitignore
@@ -19,3 +19,4 @@ bazel-rocketmq-client-cpp
 /compile_commands.json
 /.cache/
 .clangd
+build
\ No newline at end of file
diff --git a/cpp/examples/ExampleFifoProducer.cpp 
b/cpp/examples/ExampleFifoProducer.cpp
index 9d99be36..1e7829d4 100644
--- a/cpp/examples/ExampleFifoProducer.cpp
+++ b/cpp/examples/ExampleFifoProducer.cpp
@@ -28,7 +28,6 @@
 #include "rocketmq/FifoProducer.h"
 #include "rocketmq/Logger.h"
 #include "rocketmq/Message.h"
-#include "rocketmq/Producer.h"
 #include "rocketmq/SendReceipt.h"
 
 using namespace ROCKETMQ_NAMESPACE;
@@ -93,8 +92,8 @@ std::string randomString(std::string::size_type len) {
   return result;
 }
 
-DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are 
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
+DEFINE_string(topic, "FifoTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by 
your service provider");
 DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
 DEFINE_string(access_key, "", "Your access key ID");
diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp
index 5e20cc12..8c6011d8 100644
--- a/cpp/examples/ExampleProducer.cpp
+++ b/cpp/examples/ExampleProducer.cpp
@@ -51,8 +51,8 @@ std::string randomString(std::string::size_type len) {
   return result;
 }
 
-DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are 
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
+DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by 
your service provider");
 DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
 DEFINE_string(access_key, "", "Your access key ID");
diff --git a/cpp/examples/ExampleProducerWithAsync.cpp 
b/cpp/examples/ExampleProducerWithAsync.cpp
index d88dfc85..a46fdf43 100644
--- a/cpp/examples/ExampleProducerWithAsync.cpp
+++ b/cpp/examples/ExampleProducerWithAsync.cpp
@@ -89,8 +89,8 @@ std::string randomString(std::string::size_type len) {
   return result;
 }
 
-DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are 
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
+DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by 
your service provider");
 DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
 DEFINE_uint32(concurrency, 128, "Concurrency of async send");
diff --git a/cpp/examples/ExampleProducerWithFifoMessage.cpp 
b/cpp/examples/ExampleProducerWithFifoMessage.cpp
index 4fa34f9d..2d6789ba 100644
--- a/cpp/examples/ExampleProducerWithFifoMessage.cpp
+++ b/cpp/examples/ExampleProducerWithFifoMessage.cpp
@@ -48,8 +48,8 @@ std::string randomString(std::string::size_type len) {
   return result;
 }
 
-DEFINE_string(topic, "fifo_topic_sample", "Topic to which messages are 
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
+DEFINE_string(topic, "FifoTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by 
your service provider");
 DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
 DEFINE_string(access_key, "", "Your access key ID");
diff --git a/cpp/examples/ExampleProducerWithTimedMessage.cpp 
b/cpp/examples/ExampleProducerWithTimedMessage.cpp
index d6237459..ba2a45f7 100644
--- a/cpp/examples/ExampleProducerWithTimedMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp
@@ -17,7 +17,6 @@
 #include <algorithm>
 #include <atomic>
 #include <chrono>
-#include <cstddef>
 #include <iostream>
 #include <random>
 #include <string>
@@ -50,8 +49,8 @@ std::string randomString(std::string::size_type len) {
   return result;
 }
 
-DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are 
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
+DEFINE_string(topic, "TimerTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by 
your service provider");
 DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
 DEFINE_string(access_key, "", "Your access key ID");
diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp 
b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
index 50620c5a..f595c6ef 100644
--- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
@@ -48,8 +48,8 @@ std::string randomString(std::string::size_type len) {
   return result;
 }
 
-DEFINE_string(topic, "tx_topic_sample", "Topic to which messages are 
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
+DEFINE_string(topic, "TransTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by 
your service provider");
 DEFINE_int32(message_body_size, 4096, "Message body size");
 DEFINE_uint32(total, 256, "Number of sample messages to publish");
 DEFINE_string(access_key, "", "Your access key ID");
diff --git a/cpp/examples/ExamplePushConsumer.cpp 
b/cpp/examples/ExamplePushConsumer.cpp
index 66a85f4b..7017ec92 100644
--- a/cpp/examples/ExamplePushConsumer.cpp
+++ b/cpp/examples/ExamplePushConsumer.cpp
@@ -24,9 +24,9 @@
 
 using namespace ROCKETMQ_NAMESPACE;
 
-DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are 
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
-DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through 
your instance management console");
+DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by 
your service provider");
+DEFINE_string(group, "PushConsumer", "GroupId, created through your instance 
management console");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
 DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
diff --git a/cpp/examples/ExampleSimpleConsumer.cpp 
b/cpp/examples/ExampleSimpleConsumer.cpp
index aedec71e..41262ad0 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -23,9 +23,9 @@
 
 using namespace ROCKETMQ_NAMESPACE;
 
-DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are 
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
-DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through 
your instance management console");
+DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by 
your service provider");
+DEFINE_string(group, "SimpleConsumer", "GroupId, created through your instance 
management console");
 DEFINE_string(access_key, "", "Your access key ID");
 DEFINE_string(access_secret, "", "Your access secret");
 DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
diff --git a/cpp/source/base/include/InvocationContext.h 
b/cpp/source/base/include/InvocationContext.h
index 0e138c81..dd6864bf 100644
--- a/cpp/source/base/include/InvocationContext.h
+++ b/cpp/source/base/include/InvocationContext.h
@@ -81,8 +81,8 @@ struct InvocationContext : public BaseInvocationContext {
 
     if (!status.ok() && grpc::StatusCode::DEADLINE_EXCEEDED == 
status.error_code()) {
       auto diff =
-          
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()
 - context.deadline())
-              .count();
+          std::chrono::duration_cast<std::chrono::milliseconds>(
+              std::chrono::system_clock::now() - context.deadline()).count();
       SPDLOG_WARN("Asynchronous RPC[{}.{}] timed out, elapsing {}ms, 
deadline-over-due: {}ms",
                   absl::FormatTime(created_time, absl::UTCTimeZone()), 
elapsed, diff);
     }
diff --git a/cpp/source/client/ClientManagerImpl.cpp 
b/cpp/source/client/ClientManagerImpl.cpp
index 7d724c7b..bb1e2e67 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -48,6 +48,7 @@ ClientManagerImpl::ClientManagerImpl(std::string 
resource_namespace, bool with_s
       state_(State::CREATED),
       
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
       with_ssl_(with_ssl) {
+
   certificate_verifier_ = 
grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();
   tls_channel_credential_options_.set_verify_server_certs(false);
   tls_channel_credential_options_.set_check_call_host(false);
@@ -78,7 +79,7 @@ ClientManagerImpl::ClientManagerImpl(std::string 
resource_namespace, bool with_s
    */
   channel_arguments_.SetInt(GRPC_ARG_ENABLE_RETRIES, 0);
 
-  channel_arguments_.SetSslTargetNameOverride("localhost");
+  // channel_arguments_.SetSslTargetNameOverride("localhost");
 
   SPDLOG_INFO("ClientManager[ResourceNamespace={}] created", 
resource_namespace_);
 }
@@ -282,7 +283,7 @@ bool ClientManagerImpl::send(const std::string& target_host,
                              SendMessageRequest& request,
                              SendResultCallback cb) {
   assert(cb);
-  SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", 
target_host, request.DebugString());
+  SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", 
target_host, request.ShortDebugString());
   RpcClientSharedPtr client = getRpcClient(target_host);
   // Invocation context will be deleted in its onComplete() method.
   auto invocation_context = new InvocationContext<SendMessageResponse>();
@@ -440,7 +441,7 @@ bool ClientManagerImpl::send(const std::string& target_host,
 
       case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: {
         SPDLOG_WARN("Message-property-conflict-with-type: Host={}, 
Response={}", invocation_context->remote_address,
-                    invocation_context->response.DebugString());
+                    invocation_context->response.ShortDebugString());
         send_result.ec = ErrorCode::MessagePropertyConflictWithType;
         break;
       }
@@ -482,7 +483,7 @@ RpcClientSharedPtr ClientManagerImpl::getRpcClient(const 
std::string& target_hos
     auto search = rpc_clients_.find(target_host);
     if (search == rpc_clients_.end() || !search->second->ok()) {
       if (search == rpc_clients_.end()) {
-        SPDLOG_INFO("Create a RPC client to {}", target_host.data());
+        SPDLOG_INFO("Create a RPC client to [{}]", target_host.data());
       } else if (!search->second->ok()) {
         SPDLOG_INFO("Prior RPC client to {} is not OK. Re-create one", 
target_host);
       }
@@ -549,7 +550,7 @@ void ClientManagerImpl::resolveRoute(const std::string& 
target_host,
                                      std::chrono::milliseconds timeout,
                                      const std::function<void(const 
std::error_code&, const TopicRouteDataPtr&)>& cb) {
   SPDLOG_DEBUG("Name server connection URL: {}", target_host);
-  SPDLOG_DEBUG("Query route request: {}", request.DebugString());
+  SPDLOG_DEBUG("Query route request: {}", request.ShortDebugString());
   RpcClientSharedPtr client = getRpcClient(target_host, false);
   if (!client) {
     SPDLOG_WARN("Failed to create RPC client for name server[host={}]", 
target_host);
diff --git a/cpp/source/client/LogInterceptor.cpp 
b/cpp/source/client/LogInterceptor.cpp
index 77028645..9e6eded0 100644
--- a/cpp/source/client/LogInterceptor.cpp
+++ b/cpp/source/client/LogInterceptor.cpp
@@ -52,8 +52,8 @@ void 
LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth
   if 
(methods->QueryInterceptionHookPoint(grpc::experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA))
 {
     std::multimap<std::string, std::string>* metadata = 
methods->GetSendInitialMetadata();
     if (metadata) {
-      SPDLOG_DEBUG("[Outbound]Headers of {}: \n{}", client_rpc_info_->method(),
-                   absl::StrJoin(*metadata, "\n", absl::PairFormatter(" --> 
")));
+      SPDLOG_DEBUG("[Outbound]Headers of {}: {}", client_rpc_info_->method(),
+                   absl::StrJoin(*metadata, " ", absl::PairFormatter(" --> 
")));
     }
   }
 
@@ -73,8 +73,8 @@ void 
LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth
                                  absl::string_view(it.second.data(), 
it.second.length())});
       }
       if (!response_headers.empty()) {
-        SPDLOG_DEBUG("[Inbound]Response Headers of {}:\n{}", 
client_rpc_info_->method(),
-                     absl::StrJoin(response_headers, "\n", 
absl::PairFormatter(" --> ")));
+        SPDLOG_DEBUG("[Inbound]Response Headers of {}: {}", 
client_rpc_info_->method(),
+                     absl::StrJoin(response_headers, " ", 
absl::PairFormatter(" --> ")));
       } else {
         SPDLOG_DEBUG("[Inbound]Response metadata of {} is empty", 
client_rpc_info_->method());
       }
@@ -85,12 +85,12 @@ void 
LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth
     void* message = methods->GetRecvMessage();
     if (message) {
       auto* response = reinterpret_cast<google::protobuf::Message*>(message);
-      std::string&& response_text = response->DebugString();
+      std::string&& response_text = response->ShortDebugString();
       std::size_t limit = 1024;
       if (response_text.size() <= limit) {
-        SPDLOG_DEBUG("[Inbound] {}\n{}", client_rpc_info_->method(), 
response_text);
+        SPDLOG_DEBUG("[Inbound] {} {}", client_rpc_info_->method(), 
response_text);
       } else {
-        SPDLOG_DEBUG("[Inbound] {}\n{}...", client_rpc_info_->method(), 
response_text.substr(0, limit));
+        SPDLOG_DEBUG("[Inbound] {} {}...", client_rpc_info_->method(), 
response_text.substr(0, limit));
       }
     }
   }
diff --git a/cpp/source/client/RpcClientImpl.cpp 
b/cpp/source/client/RpcClientImpl.cpp
index 35016c34..d9f10212 100644
--- a/cpp/source/client/RpcClientImpl.cpp
+++ b/cpp/source/client/RpcClientImpl.cpp
@@ -16,7 +16,6 @@
  */
 #include "RpcClientImpl.h"
 
-#include <chrono>
 #include <functional>
 #include <sstream>
 #include <thread>
@@ -26,7 +25,6 @@
 #include "RpcClient.h"
 #include "TelemetryBidiReactor.h"
 #include "TlsHelper.h"
-#include "absl/time/time.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
diff --git a/cpp/source/client/SessionImpl.cpp 
b/cpp/source/client/SessionImpl.cpp
index b3f8b73b..151f1c3f 100644
--- a/cpp/source/client/SessionImpl.cpp
+++ b/cpp/source/client/SessionImpl.cpp
@@ -34,7 +34,7 @@ bool SessionImpl::await() {
 
 void SessionImpl::syncSettings() {
   auto ptr = client_.lock();
-  SPDLOG_INFO("Sync client settings to {}", rpc_client_->remoteAddress());
+  SPDLOG_INFO("Request client settings to {}", rpc_client_->remoteAddress());
   TelemetryCommand command;
   command.mutable_settings()->CopyFrom(ptr->clientSettings());
   telemetry_->write(command);
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp 
b/cpp/source/client/TelemetryBidiReactor.cpp
index e0a83a28..27557cf2 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -16,15 +16,12 @@
  */
 #include "TelemetryBidiReactor.h"
 
-#include <atomic>
-#include <cstdint>
 #include <memory>
 #include <utility>
 
 #include "ClientManager.h"
 #include "MessageExt.h"
 #include "Metadata.h"
-#include "RpcClient.h"
 #include "Signature.h"
 #include "google/protobuf/util/time_util.h"
 #include "rocketmq/Logger.h"
@@ -70,7 +67,7 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
   RemoveHold();
 
   if (!ok) {
-    SPDLOG_WARN("Failed to write telemetry command {} to {}", 
writes_.front().DebugString(), peer_address_);
+    SPDLOG_WARN("Failed to write telemetry command {} to {}", 
writes_.front().ShortDebugString(), peer_address_);
     signalClose();
     return;
   }
@@ -91,7 +88,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
   if (!ok) {
     // for read stream
     RemoveHold();
-    SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_);
+    // SPDLOG_WARN("Failed to read from telemetry stream from {}", 
peer_address_);
     signalClose();
     return;
   }
@@ -103,7 +100,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
     }
   }
 
-  SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, 
read_.DebugString());
+  SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, 
read_.ShortDebugString());
   auto client = client_.lock();
   if (!client) {
     SPDLOG_INFO("Client for {} has destructed", peer_address_);
@@ -114,19 +111,20 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
   switch (read_.command_case()) {
     case rmq::TelemetryCommand::kSettings: {
       auto settings = read_.settings();
-      SPDLOG_INFO("Received settings from {}: {}", peer_address_, 
settings.DebugString());
+      SPDLOG_INFO("Receive settings from {}: {}", peer_address_, 
settings.ShortDebugString());
       applySettings(settings);
       sync_settings_promise_.set_value(true);
       break;
     }
+
     case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
-      SPDLOG_DEBUG("Receive orphan transaction command: {}", 
read_.DebugString());
-      auto message = 
client->manager()->wrapMessage(read_.release_verify_message_command()->message());
+      SPDLOG_INFO("Receive orphan transaction command: {}", 
read_.ShortDebugString());
+      auto message = client->manager()->wrapMessage(
+          read_.recover_orphaned_transaction_command().message());
       auto raw = const_cast<Message*>(message.get());
       raw->mutableExtension().target_endpoint = peer_address_;
       raw->mutableExtension().transaction_id = 
read_.recover_orphaned_transaction_command().transaction_id();
       client->recoverOrphanedTransaction(message);
-
       break;
     }
 
@@ -156,7 +154,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
     }
 
     default: {
-      SPDLOG_WARN("Unsupported command");
+      SPDLOG_WARN("Telemetry command receive unsupported command");
       break;
     }
   }
@@ -291,7 +289,7 @@ void TelemetryBidiReactor::tryWriteNext() {
   }
 
   if (!writes_.empty()) {
-    SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, 
writes_.front().DebugString());
+    SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, 
writes_.front().ShortDebugString());
     AddHold();
     StartWrite(&(writes_.front()));
   }
diff --git a/cpp/source/client/include/TopicRouteData.h 
b/cpp/source/client/include/TopicRouteData.h
index 807ac811..aac41f1c 100644
--- a/cpp/source/client/include/TopicRouteData.h
+++ b/cpp/source/client/include/TopicRouteData.h
@@ -43,7 +43,7 @@ public:
 
   std::string debugString() const {
     return absl::StrJoin(message_queues_.begin(), message_queues_.end(), ",",
-                         [](std::string* out, const rmq::MessageQueue& m) { 
out->append(m.DebugString()); });
+                         [](std::string* out, const rmq::MessageQueue& m) { 
out->append(m.ShortDebugString()); });
   };
 
 private:
diff --git a/cpp/source/log/LoggerImpl.cpp b/cpp/source/log/LoggerImpl.cpp
index 6ff39f10..cdd2f473 100644
--- a/cpp/source/log/LoggerImpl.cpp
+++ b/cpp/source/log/LoggerImpl.cpp
@@ -131,6 +131,7 @@ Logger& getLogger() {
 const std::size_t LoggerImpl::DEFAULT_MAX_LOG_FILE_QUANTITY = 16;
 const std::size_t LoggerImpl::DEFAULT_FILE_SIZE = 1048576 * 256;
 const char* LoggerImpl::USER_HOME_ENV = "HOME";
-const char* LoggerImpl::DEFAULT_PATTERN = "[%Y/%m/%d-%H:%M:%S.%e %z] [%n] 
[%^---%L---%$] [thread %t] %v %@";
+const char* LoggerImpl::DEFAULT_PATTERN = "%Y-%m-%d %H:%M:%S.%e [%^--%L--%$] 
[%7t] %v %@";
+// const char* LoggerImpl::DEFAULT_PATTERN = "[%Y/%m/%d-%H:%M:%S.%e %z] [%n] 
[%^---%L---%$] [thread %t] %v %@";
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/ClientImpl.cpp 
b/cpp/source/rocketmq/ClientImpl.cpp
index 5be559b6..a4026cf6 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -19,11 +19,9 @@
 #include <algorithm>
 #include <atomic>
 #include <chrono>
-#include <cstdint>
 #include <cstdlib>
 #include <exception>
 #include <functional>
-#include <iterator>
 #include <memory>
 #include <string>
 #include <system_error>
@@ -43,9 +41,6 @@
 #include "absl/strings/str_split.h"
 #include "fmt/format.h"
 #include "opencensus/stats/stats.h"
-#include "rocketmq/Logger.h"
-#include "rocketmq/Message.h"
-#include "rocketmq/MessageListener.h"
 #include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
@@ -175,12 +170,14 @@ void ClientImpl::start() {
   auto telemetry_functor = [ptr]() {
     std::shared_ptr<ClientImpl> base = ptr.lock();
     if (base) {
-      SPDLOG_INFO("Sync client settings to servers");
+      SPDLOG_DEBUG("Sync client settings to servers");
       base->syncClientSettings();
     }
   };
-  telemetry_handle_ = 
client_manager_->getScheduler()->schedule(telemetry_functor, 
TELEMETRY_TASK_NAME,
-                                                                
std::chrono::minutes(5), std::chrono::minutes(5));
+
+  telemetry_handle_ = client_manager_->getScheduler()->schedule(
+      telemetry_functor, TELEMETRY_TASK_NAME,
+      std::chrono::minutes(5), std::chrono::minutes(5));
 
   auto&& metric_service_endpoint = metricServiceEndpoint();
   if (!metric_service_endpoint.empty()) {
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp 
b/cpp/source/rocketmq/ProducerImpl.cpp
index 34c5b29c..9b664d59 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -227,7 +227,10 @@ SendReceipt ProducerImpl::send(MessageConstPtr message, 
std::error_code& ec) noe
   auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& 
receipt) mutable {
     ec = code;
     SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+    send_receipt.target = std::move(receipt_mut.target);
+    send_receipt.message_id = std::move(receipt_mut.message_id);
     send_receipt.message = std::move(receipt_mut.message);
+    send_receipt.transaction_id = std::move(receipt_mut.transaction_id);
     {
       absl::MutexLock lk(mtx.get());
       completed = true;
@@ -354,7 +357,7 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> 
context) {
   client_manager_->send(target, metadata, request, callback);
 }
 
-void ProducerImpl::send0(MessageConstPtr message, SendCallback callback, 
std::vector<rmq::MessageQueue> list) {
+void ProducerImpl::send0(MessageConstPtr message, const SendCallback& 
callback, std::vector<rmq::MessageQueue> list) {
   SendReceipt send_receipt;
   std::error_code ec;
   validate(*message, ec);
@@ -371,7 +374,8 @@ void ProducerImpl::send0(MessageConstPtr message, 
SendCallback callback, std::ve
     return;
   }
 
-  auto context = std::make_shared<SendContext>(shared_from_this(), 
std::move(message), callback, std::move(list));
+  auto context = std::make_shared<SendContext>(
+      shared_from_this(), std::move(message), callback, std::move(list));
   sendImpl(context);
 }
 
diff --git a/cpp/source/rocketmq/include/ClientImpl.h 
b/cpp/source/rocketmq/include/ClientImpl.h
index d7693962..25cef46c 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -147,6 +147,7 @@ protected:
   absl::flat_hash_map<std::string, std::vector<std::function<void(const 
std::error_code&, const TopicRouteDataPtr&)>>>
       inflight_route_requests_ GUARDED_BY(inflight_route_requests_mtx_);
   absl::Mutex inflight_route_requests_mtx_ 
ACQUIRED_BEFORE(topic_route_table_mtx_); // Protects inflight_route_requests_
+
   static const char* UPDATE_ROUTE_TASK_NAME;
   std::uint32_t route_update_handle_{0};
 
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h 
b/cpp/source/rocketmq/include/ProducerImpl.h
index b572f20d..2c284172 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -176,7 +176,7 @@ private:
 
   void validate(const Message& message, std::error_code& ec);
 
-  void send0(MessageConstPtr message, SendCallback callback, 
std::vector<rmq::MessageQueue> list);
+  void send0(MessageConstPtr message, const SendCallback& callback, 
std::vector<rmq::MessageQueue> list);
 
   void isolatedEndpoints(absl::flat_hash_set<std::string>& endpoints) 
LOCKS_EXCLUDED(isolated_endpoints_mtx_);
 

Reply via email to