This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 55e7dd23 [ISSUE #928] Fix C++ simple consumer error code and close 
function (#931)
55e7dd23 is described below

commit 55e7dd238489a521cd86cde6df557a35badcb807
Author: lizhimins <707364...@qq.com>
AuthorDate: Wed Jan 22 20:42:40 2025 +0800

    [ISSUE #928] Fix C++ simple consumer error code and close function (#931)
---
 cpp/examples/ExampleSimpleConsumer.cpp        |  54 ++++---
 cpp/proto/apache/rocketmq/v2/definition.proto | 128 +++++++++++++++-
 cpp/proto/apache/rocketmq/v2/service.proto    | 213 ++++++++++++--------------
 cpp/source/client/ClientManagerImpl.cpp       |   2 +-
 cpp/source/client/TelemetryBidiReactor.cpp    |  16 +-
 cpp/source/rocketmq/ClientImpl.cpp            |   6 +-
 cpp/source/rocketmq/SimpleConsumer.cpp        |   3 +-
 cpp/source/rocketmq/SimpleConsumerImpl.cpp    |  28 +++-
 8 files changed, 287 insertions(+), 163 deletions(-)

diff --git a/cpp/examples/ExampleSimpleConsumer.cpp 
b/cpp/examples/ExampleSimpleConsumer.cpp
index 41262ad0..c28d2e4d 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -18,6 +18,7 @@
 #include <iostream>
 
 #include "gflags/gflags.h"
+#include "rocketmq/ErrorCode.h"
 #include "rocketmq/Logger.h"
 #include "rocketmq/SimpleConsumer.h"
 
@@ -42,10 +43,11 @@ int main(int argc, char* argv[]) {
 
   CredentialsProviderPtr credentials_provider;
   if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
-    credentials_provider = 
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, 
FLAGS_access_secret);
+    credentials_provider = std::make_shared<StaticCredentialsProvider>(
+        FLAGS_access_key, FLAGS_access_secret);
   }
 
-  // In most case, you don't need to create too many consumers, singletion 
pattern is recommended.
+  // In most case, you don't need to create too many consumers, singleton 
pattern is recommended.
   auto simple_consumer = SimpleConsumer::newBuilder()
                              .withGroup(FLAGS_group)
                              .withConfiguration(Configuration::newBuilder()
@@ -54,32 +56,36 @@ int main(int argc, char* argv[]) {
                                                     .withSsl(FLAGS_tls)
                                                     .build())
                              .subscribe(FLAGS_topic, tag)
+                             .withAwaitDuration(std::chrono::seconds(10))
                              .build();
-  std::vector<MessageConstSharedPtr> messages;
-  std::error_code ec;
-  simple_consumer.receive(4, std::chrono::seconds(3), ec, messages);
 
-  if (ec) {
-    std::cerr << "Failed to receive messages. Cause: " << ec.message() << 
std::endl;
-    return EXIT_FAILURE;
-  }
+  for (int j = 0; j < 30; j++) {
+    std::vector<MessageConstSharedPtr> messages;
+    std::error_code ec;
+    simple_consumer.receive(4, std::chrono::seconds(15), ec, messages);
+    if (ec) {
+      std::cerr << "Failed to receive messages. Cause: " << ec.message() << 
std::endl;
+    }
 
-  std::cout << "Received " << messages.size() << " messages" << std::endl;
-  std::size_t i = 0;
-  for (const auto& message : messages) {
-    std::cout << "Received a message[topic=" << message->topic() << ", 
message-id=" << message->id()
-              << ", receipt-handle='" << message->extension().receipt_handle 
<< "']" << std::endl;
+    std::cout << "Received " << messages.size() << " messages" << std::endl;
+    std::size_t i = 0;
 
-    std::error_code ec;
-    if (++i % 2 == 0) {
-      simple_consumer.ack(*message, ec);
-      if (ec) {
-        std::cerr << "Failed to ack message. Cause: " << ec.message() << 
std::endl;
-      }
-    } else {
-      simple_consumer.changeInvisibleDuration(*message, 
std::chrono::milliseconds(100), ec);
-      if (ec) {
-        std::cerr << "Failed to change invisible duration of message. Cause: " 
<< ec.message() << std::endl;
+    for (const auto& message : messages) {
+      std::cout << "Received a message[topic=" << message->topic()
+                << ", message-id=" << message->id()
+                << ", receipt-handle='" << message->extension().receipt_handle
+                << "']" << std::endl;
+
+      if (++i % 2 == 0) {
+        simple_consumer.ack(*message, ec);
+        if (ec) {
+          std::cerr << "Failed to ack message. Cause: " << ec.message() << 
std::endl;
+        }
+      } else {
+        simple_consumer.changeInvisibleDuration(*message, 
std::chrono::seconds(3), ec);
+        if (ec) {
+          std::cerr << "Failed to change invisible duration of message. Cause: 
" << ec.message() << std::endl;
+        }
       }
     }
   }
diff --git a/cpp/proto/apache/rocketmq/v2/definition.proto 
b/cpp/proto/apache/rocketmq/v2/definition.proto
index 67e58b8f..468c4105 100644
--- a/cpp/proto/apache/rocketmq/v2/definition.proto
+++ b/cpp/proto/apache/rocketmq/v2/definition.proto
@@ -175,10 +175,6 @@ enum DigestType {
 // 1) Standard messages should be negatively acknowledged instantly, causing
 // immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
 // previously acquired messages batch;
-//
-// Message consumption model also affects how invalid digest are handled. When
-// messages are consumed in broadcasting way,
-// TODO: define semantics of invalid-digest-when-broadcasting.
 message Digest {
   DigestType type = 1;
   string checksum = 2;
@@ -189,6 +185,7 @@ enum ClientType {
   PRODUCER = 1;
   PUSH_CONSUMER = 2;
   SIMPLE_CONSUMER = 3;
+  PULL_CONSUMER = 4;
 }
 
 enum Encoding {
@@ -270,9 +267,20 @@ message SystemProperties {
   // orphan. Servers that manages orphan messages would pick up
   // a capable publisher to resolve
   optional google.protobuf.Duration orphaned_transaction_recovery_duration = 
19;
+
+  // Information to identify whether this message is from dead letter queue.
+  optional DeadLetterQueue dead_letter_queue = 20;
+}
+
+message DeadLetterQueue {
+  // Original topic for this DLQ message.
+  string topic = 1;
+  // Original message id for this DLQ message.
+  string message_id = 2;
 }
 
 message Message {
+
   Resource topic = 1;
 
   // User defined key-value pairs.
@@ -336,6 +344,10 @@ enum Code {
   MESSAGE_CORRUPTED = 40016;
   // Request is rejected due to missing of x-mq-client-id header.
   CLIENT_ID_REQUIRED = 40017;
+  // Polling time is illegal.
+  ILLEGAL_POLLING_TIME = 40018;
+  // Offset is illegal.
+  ILLEGAL_OFFSET = 40019;
 
   // Generic code indicates that the client request lacks valid authentication
   // credentials for the requested resource.
@@ -355,6 +367,8 @@ enum Code {
   TOPIC_NOT_FOUND = 40402;
   // Consumer group resource does not exist.
   CONSUMER_GROUP_NOT_FOUND = 40403;
+  // Offset not found from server.
+  OFFSET_NOT_FOUND = 40404;
 
   // Generic code representing client side timeout when connecting to, reading 
data from, or write data to server.
   REQUEST_TIMEOUT = 40800;
@@ -363,6 +377,8 @@ enum Code {
   PAYLOAD_TOO_LARGE = 41300;
   // Message body size exceeds the threshold.
   MESSAGE_BODY_TOO_LARGE = 41301;
+  // Message body is empty.
+  MESSAGE_BODY_EMPTY = 41302;
 
   // Generic code for use cases where pre-conditions are not met.
   // For example, if a producer instance is used to publish messages without 
prior start() invocation,
@@ -432,6 +448,13 @@ enum Language {
   DOT_NET = 3;
   GOLANG = 4;
   RUST = 5;
+  PYTHON = 6;
+  PHP = 7;
+  NODE_JS = 8;
+  RUBY = 9;
+  OBJECTIVE_C = 10;
+  DART = 11;
+  KOTLIN = 12;
 }
 
 // User Agent
@@ -447,4 +470,101 @@ message UA {
 
   // Hostname of the node
   string hostname = 4;
+}
+
+message Settings {
+  // Configurations for all clients.
+  optional ClientType client_type = 1;
+
+  optional Endpoints access_point = 2;
+
+  // If publishing of messages encounters throttling or server internal errors,
+  // publishers should implement automatic retries after progressive longer
+  // back-offs for consecutive errors.
+  //
+  // When processing message fails, `backoff_policy` describes an interval
+  // after which the message should be available to consume again.
+  //
+  // For FIFO messages, the interval should be relatively small because
+  // messages of the same message group would not be readily available until
+  // the prior one depletes its lifecycle.
+  optional RetryPolicy backoff_policy = 3;
+
+  // Request timeout for RPCs excluding long-polling.
+  optional google.protobuf.Duration request_timeout = 4;
+
+  oneof pub_sub {
+    Publishing publishing = 5;
+
+    Subscription subscription = 6;
+  }
+
+  // User agent details
+  UA user_agent = 7;
+
+  Metric metric = 8;
+}
+
+message Publishing {
+  // Publishing settings below here is appointed by client, thus it is
+  // unnecessary for server to push at present.
+  //
+  // List of topics to which messages will publish to.
+  repeated Resource topics = 1;
+
+  // If the message body size exceeds `max_body_size`, broker servers would
+  // reject the request. As a result, it is advisable that Producer performs
+  // client-side check validation.
+  int32 max_body_size = 2;
+
+  // When `validate_message_type` flag set `false`, no need to validate 
message's type
+  // with messageQueue's `accept_message_types` before publishing.
+  bool validate_message_type = 3;
+}
+
+message Subscription {
+  // Subscription settings below here is appointed by client, thus it is
+  // unnecessary for server to push at present.
+  //
+  // Consumer group.
+  optional Resource group = 1;
+
+  // Subscription for consumer.
+  repeated SubscriptionEntry subscriptions = 2;
+
+  // Subscription settings below here are from server, it is essential for
+  // server to push.
+  //
+  // When FIFO flag is `true`, messages of the same message group are processed
+  // in first-in-first-out manner.
+  //
+  // Brokers will not deliver further messages of the same group until prior
+  // ones are completely acknowledged.
+  optional bool fifo = 3;
+
+  // Message receive batch size here is essential for push consumer.
+  optional int32 receive_batch_size = 4;
+
+  // Long-polling timeout for `ReceiveMessageRequest`, which is essential for
+  // push consumer.
+  optional google.protobuf.Duration long_polling_timeout = 5;
+}
+
+message Metric {
+  // Indicates that if client should export local metrics to server.
+  bool on = 1;
+
+  // The endpoint that client metrics should be exported to, which is required 
if the switch is on.
+  optional Endpoints endpoints = 2;
+}
+
+enum QueryOffsetPolicy {
+  // Use this option if client wishes to playback all existing messages.
+  BEGINNING = 0;
+
+  // Use this option if client wishes to skip all existing messages.
+  END = 1;
+
+  // Use this option if time-based seek is targeted.
+  TIMESTAMP = 2;
 }
\ No newline at end of file
diff --git a/cpp/proto/apache/rocketmq/v2/service.proto 
b/cpp/proto/apache/rocketmq/v2/service.proto
index 715594e3..1a3dbbe9 100644
--- a/cpp/proto/apache/rocketmq/v2/service.proto
+++ b/cpp/proto/apache/rocketmq/v2/service.proto
@@ -66,6 +66,8 @@ message SendResultEntry {
   string message_id = 2;
   string transaction_id = 3;
   int64 offset = 4;
+  // Unique handle to identify message to recall, support delay message for 
now.
+  string recall_handle = 5;
 }
 
 message SendMessageResponse {
@@ -96,6 +98,8 @@ message ReceiveMessageRequest {
   optional google.protobuf.Duration invisible_duration = 5;
   // For message auto renew and clean
   bool auto_renew = 6;
+  optional google.protobuf.Duration long_polling_timeout = 7;
+  optional string attempt_id = 8;
 }
 
 message ReceiveMessageResponse {
@@ -129,6 +133,7 @@ message AckMessageResultEntry {
 }
 
 message AckMessageResponse {
+
   // RPC tier status, which is used to represent RPC-level errors including
   // authentication, authorization, throttling and other general failures.
   Status status = 1;
@@ -145,18 +150,14 @@ message ForwardMessageToDeadLetterQueueRequest {
   int32 max_delivery_attempts = 6;
 }
 
-message ForwardMessageToDeadLetterQueueResponse {
-  Status status = 1;
-}
+message ForwardMessageToDeadLetterQueueResponse { Status status = 1; }
 
 message HeartbeatRequest {
   optional Resource group = 1;
   ClientType client_type = 2;
 }
 
-message HeartbeatResponse {
-  Status status = 1;
-}
+message HeartbeatResponse { Status status = 1; }
 
 message EndTransactionRequest {
   Resource topic = 1;
@@ -167,13 +168,9 @@ message EndTransactionRequest {
   string trace_context = 6;
 }
 
-message EndTransactionResponse {
-  Status status = 1;
-}
+message EndTransactionResponse { Status status = 1; }
 
-message PrintThreadStackTraceCommand {
-  string nonce = 1;
-}
+message PrintThreadStackTraceCommand { string nonce = 1; }
 
 message ThreadStackTrace {
   string nonce = 1;
@@ -194,92 +191,6 @@ message RecoverOrphanedTransactionCommand {
   string transaction_id = 2;
 }
 
-message Publishing {
-  // Publishing settings below here is appointed by client, thus it is
-  // unnecessary for server to push at present.
-  //
-  // List of topics to which messages will publish to.
-  repeated Resource topics = 1;
-
-  // If the message body size exceeds `max_body_size`, broker servers would
-  // reject the request. As a result, it is advisable that Producer performs
-  // client-side check validation.
-  int32 max_body_size = 2;
-
-  // When `validate_message_type` flag set `false`, no need to validate 
message's type
-  // with messageQueue's `accept_message_types` before publishing.
-  bool validate_message_type = 3;
-}
-
-message Subscription {
-  // Subscription settings below here is appointed by client, thus it is
-  // unnecessary for server to push at present.
-  //
-  // Consumer group.
-  optional Resource group = 1;
-
-  // Subscription for consumer.
-  repeated SubscriptionEntry subscriptions = 2;
-
-  // Subscription settings below here are from server, it is essential for
-  // server to push.
-  //
-  // When FIFO flag is `true`, messages of the same message group are processed
-  // in first-in-first-out manner.
-  //
-  // Brokers will not deliver further messages of the same group until prior
-  // ones are completely acknowledged.
-  optional bool fifo = 3;
-
-  // Message receive batch size here is essential for push consumer.
-  optional int32 receive_batch_size = 4;
-
-  // Long-polling timeout for `ReceiveMessageRequest`, which is essential for
-  // push consumer.
-  optional google.protobuf.Duration long_polling_timeout = 5;
-}
-
-message Metric {
-  // Indicates that if client should export local metrics to server.
-  bool on = 1;
-
-  // The endpoint that client metrics should be exported to, which is required 
if the switch is on.
-  optional Endpoints endpoints = 2;
-}
-
-message Settings {
-  // Configurations for all clients.
-  optional ClientType client_type = 1;
-
-  optional Endpoints access_point = 2;
-
-  // If publishing of messages encounters throttling or server internal errors,
-  // publishers should implement automatic retries after progressive longer
-  // back-offs for consecutive errors.
-  //
-  // When processing message fails, `backoff_policy` describes an interval
-  // after which the message should be available to consume again.
-  //
-  // For FIFO messages, the interval should be relatively small because
-  // messages of the same message group would not be readily available until
-  // the prior one depletes its lifecycle.
-  optional RetryPolicy backoff_policy = 3;
-
-  // Request timeout for RPCs excluding long-polling.
-  optional google.protobuf.Duration request_timeout = 4;
-
-  oneof pub_sub {
-    Publishing publishing = 5;
-
-    Subscription subscription = 6;
-  }
-
-  // User agent details
-  UA user_agent = 7;
-
-  Metric metric = 8;
-}
-
 message TelemetryCommand {
   optional Status status = 1;
 
@@ -313,9 +224,7 @@ message NotifyClientTerminationRequest {
   optional Resource group = 1;
 }
 
-message NotifyClientTerminationResponse {
-  Status status = 1;
-}
+message NotifyClientTerminationResponse { Status status = 1; }
 
 message ChangeInvisibleDurationRequest {
   Resource group = 1;
@@ -338,6 +247,65 @@ message ChangeInvisibleDurationResponse {
   string receipt_handle = 2;
 }
 
+message PullMessageRequest {
+  Resource group = 1;
+  MessageQueue message_queue = 2;
+  int64 offset = 3;
+  int32 batch_size = 4;
+  FilterExpression filter_expression = 5;
+  google.protobuf.Duration long_polling_timeout = 6;
+}
+
+message PullMessageResponse {
+  oneof content {
+    Status status = 1;
+    Message message = 2;
+    int64 next_offset = 3;
+  }
+}
+
+message UpdateOffsetRequest {
+  Resource group = 1;
+  MessageQueue message_queue = 2;
+  int64 offset = 3;
+}
+
+message UpdateOffsetResponse {
+  Status status = 1;
+}
+
+message GetOffsetRequest {
+  Resource group = 1;
+  MessageQueue message_queue = 2;
+}
+
+message GetOffsetResponse {
+  Status status = 1;
+  int64 offset = 2;
+}
+
+message QueryOffsetRequest {
+  MessageQueue message_queue = 1;
+  QueryOffsetPolicy query_offset_policy = 2;
+  optional google.protobuf.Timestamp timestamp = 3;
+}
+
+message QueryOffsetResponse {
+  Status status = 1;
+  int64 offset = 2;
+}
+
+message RecallMessageRequest {
+  Resource topic = 1;
+  // Refer to SendResultEntry.
+  string recall_handle = 2;
+}
+
+message RecallMessageResponse {
+  Status status = 1;
+  string message_id = 2;
+}
+
 // For all the RPCs in MessagingService, the following error handling policies
 // apply:
 //
@@ -349,6 +317,7 @@ message ChangeInvisibleDurationResponse {
 // common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side
 // errors raise, return a response with common.status.code == `INTERNAL`.
 service MessagingService {
+
   // Queries the route entries of the requested topic in the perspective of the
   // given endpoints. On success, servers should return a collection of
   // addressable message-queues. Note servers may return customized route
@@ -356,8 +325,7 @@ service MessagingService {
   //
   // If the requested topic doesn't exist, returns `NOT_FOUND`.
   // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
-  rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {
-  }
+  rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {}
 
   // Producer or consumer sends HeartbeatRequest to servers periodically to
   // keep-alive. Additionally, it also reports client-side configuration,
@@ -367,8 +335,7 @@ service MessagingService {
   //
   // If a client specifies a language that is not yet supported by servers,
   // returns `INVALID_ARGUMENT`
-  rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {
-  }
+  rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
 
   // Delivers messages to brokers.
   // Clients may further:
@@ -383,8 +350,7 @@ service MessagingService {
   // Returns message-id or transaction-id with status `OK` on success.
   //
   // If the destination topic doesn't exist, returns `NOT_FOUND`.
-  rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {
-  }
+  rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
 
   // Queries the assigned route info of a topic for current consumer,
   // the returned assignment result is decided by server-side load balancer.
@@ -418,18 +384,30 @@ service MessagingService {
   //
   // If the given receipt_handle is illegal or out of date, returns
   // `INVALID_ARGUMENT`.
-  rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {
-  }
+  rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
 
   // Forwards one message to dead letter queue if the max delivery attempts is
   // exceeded by this message at client-side, return `OK` if success.
   rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
-      returns (ForwardMessageToDeadLetterQueueResponse) {
-  }
+      returns (ForwardMessageToDeadLetterQueueResponse) {}
+
+  // PullMessage and ReceiveMessage RPCs serve a similar purpose,
+  // which is to attempt to get messages from the server, but with different 
semantics.
+  rpc PullMessage(PullMessageRequest) returns (stream PullMessageResponse) {}
+
+  // Update the consumption progress of the designated queue of the
+  // consumer group to the remote.
+  rpc UpdateOffset(UpdateOffsetRequest) returns (UpdateOffsetResponse) {}
+
+  // Query the consumption progress of the designated queue of the
+  // consumer group to the remote.
+  rpc GetOffset(GetOffsetRequest) returns (GetOffsetResponse) {}
+
+  // Query the offset of the designated queue by the query offset policy.
+  rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {}
 
   // Commits or rollback one transactional message.
-  rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {
-  }
+  rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}
 
   // Once a client starts, it would immediately establishes bi-lateral stream
   // RPCs with brokers, reporting its settings as the initiative command.
@@ -437,8 +415,7 @@ service MessagingService {
   // When servers have need of inspecting client status, they would issue
   // telemetry commands to clients. After executing received instructions,
   // clients shall report command execution results through client-side 
streams.
-  rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {
-  }
+  rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {}
 
   // Notify the server that the client is terminated.
   rpc NotifyClientTermination(NotifyClientTerminationRequest) returns 
(NotifyClientTerminationResponse) {
@@ -452,4 +429,10 @@ service MessagingService {
   // ChangeInvisibleDuration to lengthen invisible duration.
   rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest) returns 
(ChangeInvisibleDurationResponse) {
   }
+
+  // Recall a message,
+  // for delay message, should recall before delivery time, like the rollback 
operation of transaction message,
+  // for normal message, not supported for now.
+  rpc RecallMessage(RecallMessageRequest) returns (RecallMessageResponse) {
+  }
 }
\ No newline at end of file
diff --git a/cpp/source/client/ClientManagerImpl.cpp 
b/cpp/source/client/ClientManagerImpl.cpp
index bb1e2e67..053f7723 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -177,7 +177,7 @@ void ClientManagerImpl::heartbeat(const std::string& 
target_host,
                                   const HeartbeatRequest& request,
                                   std::chrono::milliseconds timeout,
                                   const std::function<void(const 
std::error_code&, const HeartbeatResponse&)>& cb) {
-  SPDLOG_DEBUG("Prepare to send heartbeat to {}. Request: {}", target_host, 
request.DebugString());
+  SPDLOG_DEBUG("Prepare to send heartbeat to {}. Request: {}", target_host, 
request.ShortDebugString());
   auto client = getRpcClient(target_host, true);
   auto invocation_context = new InvocationContext<HeartbeatResponse>();
   invocation_context->task_name = fmt::format("Heartbeat to {}", target_host);
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp 
b/cpp/source/client/TelemetryBidiReactor.cpp
index 27557cf2..74e8689c 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -301,7 +301,7 @@ void TelemetryBidiReactor::signalClose() {
 }
 
 void TelemetryBidiReactor::close() {
-  SPDLOG_INFO("{}#fireClose", peer_address_);
+  SPDLOG_DEBUG("{}#fireClose", peer_address_);
 
   {
     absl::MutexLock lk(&state_mtx_);
@@ -316,14 +316,12 @@ void TelemetryBidiReactor::close() {
   }
   context_.TryCancel();
 
-  {
-    // Acquire state lock
+  // Acquire state lock
+  while (StreamState::Closed != state_) {
     absl::MutexLock lk(&state_mtx_);
-    while (StreamState::Closed != state_) {
-      if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) {
-        SPDLOG_WARN("StreamState CondVar timed out before getting signalled: 
state={}",
-                    static_cast<uint8_t>(state_));
-      }
+    if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) {
+      SPDLOG_WARN("StreamState CondVar timed out before getting signalled: 
state={}",
+                  static_cast<uint8_t>(state_));
     }
   }
 }
@@ -338,7 +336,7 @@ void TelemetryBidiReactor::close() {
 void TelemetryBidiReactor::OnDone(const grpc::Status& status) {
   SPDLOG_DEBUG("{}#OnDone, status.ok={}", peer_address_, status.ok());
   if (!status.ok()) {
-    SPDLOG_WARN("{}#OnDone, status.error_code={}, status.error_message={}, 
status.error_details={}", peer_address_,
+    SPDLOG_DEBUG("{}#OnDone, status.error_code={}, status.error_message={}, 
status.error_details={}", peer_address_,
                 status.error_code(), status.error_message(), 
status.error_details());
   }
   {
diff --git a/cpp/source/rocketmq/ClientImpl.cpp 
b/cpp/source/rocketmq/ClientImpl.cpp
index a4026cf6..7f91b048 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -175,6 +175,8 @@ void ClientImpl::start() {
     }
   };
 
+  // refer java sdk: set refresh interval to 5 minutes
+  // org.apache.rocketmq.client.java.impl.ClientSessionImpl#syncSettings0
   telemetry_handle_ = client_manager_->getScheduler()->schedule(
       telemetry_functor, TELEMETRY_TASK_NAME,
       std::chrono::minutes(5), std::chrono::minutes(5));
@@ -401,8 +403,8 @@ void ClientImpl::heartbeat() {
       }
       SPDLOG_DEBUG("Heartbeat to {} OK", target);
     };
-    client_manager_->heartbeat(target, metadata, request, 
absl::ToChronoMilliseconds(client_config_.request_timeout),
-                               callback);
+    client_manager_->heartbeat(target, metadata, request,
+      absl::ToChronoMilliseconds(client_config_.request_timeout), callback);
   }
 }
 
diff --git a/cpp/source/rocketmq/SimpleConsumer.cpp 
b/cpp/source/rocketmq/SimpleConsumer.cpp
index a48a0e49..5ab92f4b 100644
--- a/cpp/source/rocketmq/SimpleConsumer.cpp
+++ b/cpp/source/rocketmq/SimpleConsumer.cpp
@@ -19,6 +19,7 @@
 
 #include "SimpleConsumerImpl.h"
 #include "StaticNameServerResolver.h"
+#include "rocketmq/ErrorCode.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -58,7 +59,7 @@ void SimpleConsumer::receive(std::size_t limit,
   auto callback = [&, mtx, cv](const std::error_code& code, const 
std::vector<MessageConstSharedPtr>& result) {
     {
       absl::MutexLock lk(mtx.get());
-      if (code) {
+      if (code && code != ErrorCode::NoContent) {
         ec = code;
         SPDLOG_WARN("Failed to receive message. Cause: {}", code.message());
       }
diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp 
b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
index 7a1b3edf..df060793 100644
--- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
@@ -94,8 +94,14 @@ void SimpleConsumerImpl::start() {
         simple_consumer->refreshAssignments0();
       }
     };
-    refresh_assignment_task_ = 
manager()->getScheduler()->schedule(refresh_assignment_task, 
"RefreshAssignmentTask",
-                                                                   
std::chrono::seconds(3), std::chrono::seconds(3));
+
+    // refer java sdk: set refresh interval to 30 seconds
+    // org.apache.rocketmq.client.java.impl.ClientImpl#startUp
+    refresh_assignment_task_ = manager()->getScheduler()->schedule(
+        refresh_assignment_task, "RefreshAssignmentTask",
+        std::chrono::minutes(5), std::chrono::seconds(5));
+
+    client_manager_->addClientObserver(shared_from_this());
   }
 }
 
@@ -307,12 +313,21 @@ void SimpleConsumerImpl::receive(std::size_t limit,
   request.set_auto_renew(false);
   request.mutable_group()->CopyFrom(config().subscriber.group);
   request.mutable_message_queue()->CopyFrom(assignment.message_queue());
-  request.set_batch_size(limit);
+  request.set_batch_size((int32_t) limit);
+
+  request.mutable_filter_expression()->set_type(rmq::FilterType::TAG);
+  request.mutable_filter_expression()->set_expression("*");
 
-  auto duration = 
google::protobuf::util::TimeUtil::MillisecondsToDuration(invisible_duration.count());
+  auto invisible_duration_request =
+      
google::protobuf::util::TimeUtil::MillisecondsToDuration(invisible_duration.count());
+  
request.mutable_invisible_duration()->set_nanos(invisible_duration_request.nanos());
+  
request.mutable_invisible_duration()->set_seconds(invisible_duration_request.seconds());
 
-  request.mutable_invisible_duration()->set_nanos(duration.nanos());
-  request.mutable_invisible_duration()->set_seconds(duration.seconds());
+  auto await_duration_request =
+      google::protobuf::util::TimeUtil::MillisecondsToDuration(
+          MixAll::millisecondsOf(long_polling_duration_));
+  
request.mutable_long_polling_timeout()->set_nanos(await_duration_request.nanos());
+  
request.mutable_long_polling_timeout()->set_seconds(await_duration_request.seconds());
 
   auto cb = [callback](const std::error_code& ec, const ReceiveMessageResult& 
result) {
     std::vector<MessageConstSharedPtr> messages;
@@ -324,7 +339,6 @@ void SimpleConsumerImpl::receive(std::size_t limit,
     callback(ec, result.messages);
   };
 
-  SPDLOG_DEBUG("ReceiveMessage.polling_timeout: {}ms", 
MixAll::millisecondsOf(long_polling_duration_));
   manager()->receiveMessage(target, metadata, request,
                             long_polling_duration_ + 
absl::ToChronoMilliseconds(requestTimeout()), cb);
 }

Reply via email to