This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 55e7dd23 [ISSUE #928] Fix C++ simple consumer error code and close function (#931) 55e7dd23 is described below commit 55e7dd238489a521cd86cde6df557a35badcb807 Author: lizhimins <707364...@qq.com> AuthorDate: Wed Jan 22 20:42:40 2025 +0800 [ISSUE #928] Fix C++ simple consumer error code and close function (#931) --- cpp/examples/ExampleSimpleConsumer.cpp | 54 ++++--- cpp/proto/apache/rocketmq/v2/definition.proto | 128 +++++++++++++++- cpp/proto/apache/rocketmq/v2/service.proto | 213 ++++++++++++-------------- cpp/source/client/ClientManagerImpl.cpp | 2 +- cpp/source/client/TelemetryBidiReactor.cpp | 16 +- cpp/source/rocketmq/ClientImpl.cpp | 6 +- cpp/source/rocketmq/SimpleConsumer.cpp | 3 +- cpp/source/rocketmq/SimpleConsumerImpl.cpp | 28 +++- 8 files changed, 287 insertions(+), 163 deletions(-) diff --git a/cpp/examples/ExampleSimpleConsumer.cpp b/cpp/examples/ExampleSimpleConsumer.cpp index 41262ad0..c28d2e4d 100644 --- a/cpp/examples/ExampleSimpleConsumer.cpp +++ b/cpp/examples/ExampleSimpleConsumer.cpp @@ -18,6 +18,7 @@ #include <iostream> #include "gflags/gflags.h" +#include "rocketmq/ErrorCode.h" #include "rocketmq/Logger.h" #include "rocketmq/SimpleConsumer.h" @@ -42,10 +43,11 @@ int main(int argc, char* argv[]) { CredentialsProviderPtr credentials_provider; if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) { - credentials_provider = std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, FLAGS_access_secret); + credentials_provider = std::make_shared<StaticCredentialsProvider>( + FLAGS_access_key, FLAGS_access_secret); } - // In most case, you don't need to create too many consumers, singletion pattern is recommended. + // In most case, you don't need to create too many consumers, singleton pattern is recommended. auto simple_consumer = SimpleConsumer::newBuilder() .withGroup(FLAGS_group) .withConfiguration(Configuration::newBuilder() @@ -54,32 +56,36 @@ int main(int argc, char* argv[]) { .withSsl(FLAGS_tls) .build()) .subscribe(FLAGS_topic, tag) + .withAwaitDuration(std::chrono::seconds(10)) .build(); - std::vector<MessageConstSharedPtr> messages; - std::error_code ec; - simple_consumer.receive(4, std::chrono::seconds(3), ec, messages); - if (ec) { - std::cerr << "Failed to receive messages. Cause: " << ec.message() << std::endl; - return EXIT_FAILURE; - } + for (int j = 0; j < 30; j++) { + std::vector<MessageConstSharedPtr> messages; + std::error_code ec; + simple_consumer.receive(4, std::chrono::seconds(15), ec, messages); + if (ec) { + std::cerr << "Failed to receive messages. Cause: " << ec.message() << std::endl; + } - std::cout << "Received " << messages.size() << " messages" << std::endl; - std::size_t i = 0; - for (const auto& message : messages) { - std::cout << "Received a message[topic=" << message->topic() << ", message-id=" << message->id() - << ", receipt-handle='" << message->extension().receipt_handle << "']" << std::endl; + std::cout << "Received " << messages.size() << " messages" << std::endl; + std::size_t i = 0; - std::error_code ec; - if (++i % 2 == 0) { - simple_consumer.ack(*message, ec); - if (ec) { - std::cerr << "Failed to ack message. Cause: " << ec.message() << std::endl; - } - } else { - simple_consumer.changeInvisibleDuration(*message, std::chrono::milliseconds(100), ec); - if (ec) { - std::cerr << "Failed to change invisible duration of message. Cause: " << ec.message() << std::endl; + for (const auto& message : messages) { + std::cout << "Received a message[topic=" << message->topic() + << ", message-id=" << message->id() + << ", receipt-handle='" << message->extension().receipt_handle + << "']" << std::endl; + + if (++i % 2 == 0) { + simple_consumer.ack(*message, ec); + if (ec) { + std::cerr << "Failed to ack message. Cause: " << ec.message() << std::endl; + } + } else { + simple_consumer.changeInvisibleDuration(*message, std::chrono::seconds(3), ec); + if (ec) { + std::cerr << "Failed to change invisible duration of message. Cause: " << ec.message() << std::endl; + } } } } diff --git a/cpp/proto/apache/rocketmq/v2/definition.proto b/cpp/proto/apache/rocketmq/v2/definition.proto index 67e58b8f..468c4105 100644 --- a/cpp/proto/apache/rocketmq/v2/definition.proto +++ b/cpp/proto/apache/rocketmq/v2/definition.proto @@ -175,10 +175,6 @@ enum DigestType { // 1) Standard messages should be negatively acknowledged instantly, causing // immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch // previously acquired messages batch; -// -// Message consumption model also affects how invalid digest are handled. When -// messages are consumed in broadcasting way, -// TODO: define semantics of invalid-digest-when-broadcasting. message Digest { DigestType type = 1; string checksum = 2; @@ -189,6 +185,7 @@ enum ClientType { PRODUCER = 1; PUSH_CONSUMER = 2; SIMPLE_CONSUMER = 3; + PULL_CONSUMER = 4; } enum Encoding { @@ -270,9 +267,20 @@ message SystemProperties { // orphan. Servers that manages orphan messages would pick up // a capable publisher to resolve optional google.protobuf.Duration orphaned_transaction_recovery_duration = 19; + + // Information to identify whether this message is from dead letter queue. + optional DeadLetterQueue dead_letter_queue = 20; +} + +message DeadLetterQueue { + // Original topic for this DLQ message. + string topic = 1; + // Original message id for this DLQ message. + string message_id = 2; } message Message { + Resource topic = 1; // User defined key-value pairs. @@ -336,6 +344,10 @@ enum Code { MESSAGE_CORRUPTED = 40016; // Request is rejected due to missing of x-mq-client-id header. CLIENT_ID_REQUIRED = 40017; + // Polling time is illegal. + ILLEGAL_POLLING_TIME = 40018; + // Offset is illegal. + ILLEGAL_OFFSET = 40019; // Generic code indicates that the client request lacks valid authentication // credentials for the requested resource. @@ -355,6 +367,8 @@ enum Code { TOPIC_NOT_FOUND = 40402; // Consumer group resource does not exist. CONSUMER_GROUP_NOT_FOUND = 40403; + // Offset not found from server. + OFFSET_NOT_FOUND = 40404; // Generic code representing client side timeout when connecting to, reading data from, or write data to server. REQUEST_TIMEOUT = 40800; @@ -363,6 +377,8 @@ enum Code { PAYLOAD_TOO_LARGE = 41300; // Message body size exceeds the threshold. MESSAGE_BODY_TOO_LARGE = 41301; + // Message body is empty. + MESSAGE_BODY_EMPTY = 41302; // Generic code for use cases where pre-conditions are not met. // For example, if a producer instance is used to publish messages without prior start() invocation, @@ -432,6 +448,13 @@ enum Language { DOT_NET = 3; GOLANG = 4; RUST = 5; + PYTHON = 6; + PHP = 7; + NODE_JS = 8; + RUBY = 9; + OBJECTIVE_C = 10; + DART = 11; + KOTLIN = 12; } // User Agent @@ -447,4 +470,101 @@ message UA { // Hostname of the node string hostname = 4; +} + +message Settings { + // Configurations for all clients. + optional ClientType client_type = 1; + + optional Endpoints access_point = 2; + + // If publishing of messages encounters throttling or server internal errors, + // publishers should implement automatic retries after progressive longer + // back-offs for consecutive errors. + // + // When processing message fails, `backoff_policy` describes an interval + // after which the message should be available to consume again. + // + // For FIFO messages, the interval should be relatively small because + // messages of the same message group would not be readily available until + // the prior one depletes its lifecycle. + optional RetryPolicy backoff_policy = 3; + + // Request timeout for RPCs excluding long-polling. + optional google.protobuf.Duration request_timeout = 4; + + oneof pub_sub { + Publishing publishing = 5; + + Subscription subscription = 6; + } + + // User agent details + UA user_agent = 7; + + Metric metric = 8; +} + +message Publishing { + // Publishing settings below here is appointed by client, thus it is + // unnecessary for server to push at present. + // + // List of topics to which messages will publish to. + repeated Resource topics = 1; + + // If the message body size exceeds `max_body_size`, broker servers would + // reject the request. As a result, it is advisable that Producer performs + // client-side check validation. + int32 max_body_size = 2; + + // When `validate_message_type` flag set `false`, no need to validate message's type + // with messageQueue's `accept_message_types` before publishing. + bool validate_message_type = 3; +} + +message Subscription { + // Subscription settings below here is appointed by client, thus it is + // unnecessary for server to push at present. + // + // Consumer group. + optional Resource group = 1; + + // Subscription for consumer. + repeated SubscriptionEntry subscriptions = 2; + + // Subscription settings below here are from server, it is essential for + // server to push. + // + // When FIFO flag is `true`, messages of the same message group are processed + // in first-in-first-out manner. + // + // Brokers will not deliver further messages of the same group until prior + // ones are completely acknowledged. + optional bool fifo = 3; + + // Message receive batch size here is essential for push consumer. + optional int32 receive_batch_size = 4; + + // Long-polling timeout for `ReceiveMessageRequest`, which is essential for + // push consumer. + optional google.protobuf.Duration long_polling_timeout = 5; +} + +message Metric { + // Indicates that if client should export local metrics to server. + bool on = 1; + + // The endpoint that client metrics should be exported to, which is required if the switch is on. + optional Endpoints endpoints = 2; +} + +enum QueryOffsetPolicy { + // Use this option if client wishes to playback all existing messages. + BEGINNING = 0; + + // Use this option if client wishes to skip all existing messages. + END = 1; + + // Use this option if time-based seek is targeted. + TIMESTAMP = 2; } \ No newline at end of file diff --git a/cpp/proto/apache/rocketmq/v2/service.proto b/cpp/proto/apache/rocketmq/v2/service.proto index 715594e3..1a3dbbe9 100644 --- a/cpp/proto/apache/rocketmq/v2/service.proto +++ b/cpp/proto/apache/rocketmq/v2/service.proto @@ -66,6 +66,8 @@ message SendResultEntry { string message_id = 2; string transaction_id = 3; int64 offset = 4; + // Unique handle to identify message to recall, support delay message for now. + string recall_handle = 5; } message SendMessageResponse { @@ -96,6 +98,8 @@ message ReceiveMessageRequest { optional google.protobuf.Duration invisible_duration = 5; // For message auto renew and clean bool auto_renew = 6; + optional google.protobuf.Duration long_polling_timeout = 7; + optional string attempt_id = 8; } message ReceiveMessageResponse { @@ -129,6 +133,7 @@ message AckMessageResultEntry { } message AckMessageResponse { + // RPC tier status, which is used to represent RPC-level errors including // authentication, authorization, throttling and other general failures. Status status = 1; @@ -145,18 +150,14 @@ message ForwardMessageToDeadLetterQueueRequest { int32 max_delivery_attempts = 6; } -message ForwardMessageToDeadLetterQueueResponse { - Status status = 1; -} +message ForwardMessageToDeadLetterQueueResponse { Status status = 1; } message HeartbeatRequest { optional Resource group = 1; ClientType client_type = 2; } -message HeartbeatResponse { - Status status = 1; -} +message HeartbeatResponse { Status status = 1; } message EndTransactionRequest { Resource topic = 1; @@ -167,13 +168,9 @@ message EndTransactionRequest { string trace_context = 6; } -message EndTransactionResponse { - Status status = 1; -} +message EndTransactionResponse { Status status = 1; } -message PrintThreadStackTraceCommand { - string nonce = 1; -} +message PrintThreadStackTraceCommand { string nonce = 1; } message ThreadStackTrace { string nonce = 1; @@ -194,92 +191,6 @@ message RecoverOrphanedTransactionCommand { string transaction_id = 2; } -message Publishing { - // Publishing settings below here is appointed by client, thus it is - // unnecessary for server to push at present. - // - // List of topics to which messages will publish to. - repeated Resource topics = 1; - - // If the message body size exceeds `max_body_size`, broker servers would - // reject the request. As a result, it is advisable that Producer performs - // client-side check validation. - int32 max_body_size = 2; - - // When `validate_message_type` flag set `false`, no need to validate message's type - // with messageQueue's `accept_message_types` before publishing. - bool validate_message_type = 3; -} - -message Subscription { - // Subscription settings below here is appointed by client, thus it is - // unnecessary for server to push at present. - // - // Consumer group. - optional Resource group = 1; - - // Subscription for consumer. - repeated SubscriptionEntry subscriptions = 2; - - // Subscription settings below here are from server, it is essential for - // server to push. - // - // When FIFO flag is `true`, messages of the same message group are processed - // in first-in-first-out manner. - // - // Brokers will not deliver further messages of the same group until prior - // ones are completely acknowledged. - optional bool fifo = 3; - - // Message receive batch size here is essential for push consumer. - optional int32 receive_batch_size = 4; - - // Long-polling timeout for `ReceiveMessageRequest`, which is essential for - // push consumer. - optional google.protobuf.Duration long_polling_timeout = 5; -} - -message Metric { - // Indicates that if client should export local metrics to server. - bool on = 1; - - // The endpoint that client metrics should be exported to, which is required if the switch is on. - optional Endpoints endpoints = 2; -} - -message Settings { - // Configurations for all clients. - optional ClientType client_type = 1; - - optional Endpoints access_point = 2; - - // If publishing of messages encounters throttling or server internal errors, - // publishers should implement automatic retries after progressive longer - // back-offs for consecutive errors. - // - // When processing message fails, `backoff_policy` describes an interval - // after which the message should be available to consume again. - // - // For FIFO messages, the interval should be relatively small because - // messages of the same message group would not be readily available until - // the prior one depletes its lifecycle. - optional RetryPolicy backoff_policy = 3; - - // Request timeout for RPCs excluding long-polling. - optional google.protobuf.Duration request_timeout = 4; - - oneof pub_sub { - Publishing publishing = 5; - - Subscription subscription = 6; - } - - // User agent details - UA user_agent = 7; - - Metric metric = 8; -} - message TelemetryCommand { optional Status status = 1; @@ -313,9 +224,7 @@ message NotifyClientTerminationRequest { optional Resource group = 1; } -message NotifyClientTerminationResponse { - Status status = 1; -} +message NotifyClientTerminationResponse { Status status = 1; } message ChangeInvisibleDurationRequest { Resource group = 1; @@ -338,6 +247,65 @@ message ChangeInvisibleDurationResponse { string receipt_handle = 2; } +message PullMessageRequest { + Resource group = 1; + MessageQueue message_queue = 2; + int64 offset = 3; + int32 batch_size = 4; + FilterExpression filter_expression = 5; + google.protobuf.Duration long_polling_timeout = 6; +} + +message PullMessageResponse { + oneof content { + Status status = 1; + Message message = 2; + int64 next_offset = 3; + } +} + +message UpdateOffsetRequest { + Resource group = 1; + MessageQueue message_queue = 2; + int64 offset = 3; +} + +message UpdateOffsetResponse { + Status status = 1; +} + +message GetOffsetRequest { + Resource group = 1; + MessageQueue message_queue = 2; +} + +message GetOffsetResponse { + Status status = 1; + int64 offset = 2; +} + +message QueryOffsetRequest { + MessageQueue message_queue = 1; + QueryOffsetPolicy query_offset_policy = 2; + optional google.protobuf.Timestamp timestamp = 3; +} + +message QueryOffsetResponse { + Status status = 1; + int64 offset = 2; +} + +message RecallMessageRequest { + Resource topic = 1; + // Refer to SendResultEntry. + string recall_handle = 2; +} + +message RecallMessageResponse { + Status status = 1; + string message_id = 2; +} + // For all the RPCs in MessagingService, the following error handling policies // apply: // @@ -349,6 +317,7 @@ message ChangeInvisibleDurationResponse { // common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side // errors raise, return a response with common.status.code == `INTERNAL`. service MessagingService { + // Queries the route entries of the requested topic in the perspective of the // given endpoints. On success, servers should return a collection of // addressable message-queues. Note servers may return customized route @@ -356,8 +325,7 @@ service MessagingService { // // If the requested topic doesn't exist, returns `NOT_FOUND`. // If the specific endpoints is empty, returns `INVALID_ARGUMENT`. - rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) { - } + rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {} // Producer or consumer sends HeartbeatRequest to servers periodically to // keep-alive. Additionally, it also reports client-side configuration, @@ -367,8 +335,7 @@ service MessagingService { // // If a client specifies a language that is not yet supported by servers, // returns `INVALID_ARGUMENT` - rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) { - } + rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {} // Delivers messages to brokers. // Clients may further: @@ -383,8 +350,7 @@ service MessagingService { // Returns message-id or transaction-id with status `OK` on success. // // If the destination topic doesn't exist, returns `NOT_FOUND`. - rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) { - } + rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {} // Queries the assigned route info of a topic for current consumer, // the returned assignment result is decided by server-side load balancer. @@ -418,18 +384,30 @@ service MessagingService { // // If the given receipt_handle is illegal or out of date, returns // `INVALID_ARGUMENT`. - rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) { - } + rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {} // Forwards one message to dead letter queue if the max delivery attempts is // exceeded by this message at client-side, return `OK` if success. rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest) - returns (ForwardMessageToDeadLetterQueueResponse) { - } + returns (ForwardMessageToDeadLetterQueueResponse) {} + + // PullMessage and ReceiveMessage RPCs serve a similar purpose, + // which is to attempt to get messages from the server, but with different semantics. + rpc PullMessage(PullMessageRequest) returns (stream PullMessageResponse) {} + + // Update the consumption progress of the designated queue of the + // consumer group to the remote. + rpc UpdateOffset(UpdateOffsetRequest) returns (UpdateOffsetResponse) {} + + // Query the consumption progress of the designated queue of the + // consumer group to the remote. + rpc GetOffset(GetOffsetRequest) returns (GetOffsetResponse) {} + + // Query the offset of the designated queue by the query offset policy. + rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {} // Commits or rollback one transactional message. - rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) { - } + rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {} // Once a client starts, it would immediately establishes bi-lateral stream // RPCs with brokers, reporting its settings as the initiative command. @@ -437,8 +415,7 @@ service MessagingService { // When servers have need of inspecting client status, they would issue // telemetry commands to clients. After executing received instructions, // clients shall report command execution results through client-side streams. - rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) { - } + rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {} // Notify the server that the client is terminated. rpc NotifyClientTermination(NotifyClientTerminationRequest) returns (NotifyClientTerminationResponse) { @@ -452,4 +429,10 @@ service MessagingService { // ChangeInvisibleDuration to lengthen invisible duration. rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest) returns (ChangeInvisibleDurationResponse) { } + + // Recall a message, + // for delay message, should recall before delivery time, like the rollback operation of transaction message, + // for normal message, not supported for now. + rpc RecallMessage(RecallMessageRequest) returns (RecallMessageResponse) { + } } \ No newline at end of file diff --git a/cpp/source/client/ClientManagerImpl.cpp b/cpp/source/client/ClientManagerImpl.cpp index bb1e2e67..053f7723 100644 --- a/cpp/source/client/ClientManagerImpl.cpp +++ b/cpp/source/client/ClientManagerImpl.cpp @@ -177,7 +177,7 @@ void ClientManagerImpl::heartbeat(const std::string& target_host, const HeartbeatRequest& request, std::chrono::milliseconds timeout, const std::function<void(const std::error_code&, const HeartbeatResponse&)>& cb) { - SPDLOG_DEBUG("Prepare to send heartbeat to {}. Request: {}", target_host, request.DebugString()); + SPDLOG_DEBUG("Prepare to send heartbeat to {}. Request: {}", target_host, request.ShortDebugString()); auto client = getRpcClient(target_host, true); auto invocation_context = new InvocationContext<HeartbeatResponse>(); invocation_context->task_name = fmt::format("Heartbeat to {}", target_host); diff --git a/cpp/source/client/TelemetryBidiReactor.cpp b/cpp/source/client/TelemetryBidiReactor.cpp index 27557cf2..74e8689c 100644 --- a/cpp/source/client/TelemetryBidiReactor.cpp +++ b/cpp/source/client/TelemetryBidiReactor.cpp @@ -301,7 +301,7 @@ void TelemetryBidiReactor::signalClose() { } void TelemetryBidiReactor::close() { - SPDLOG_INFO("{}#fireClose", peer_address_); + SPDLOG_DEBUG("{}#fireClose", peer_address_); { absl::MutexLock lk(&state_mtx_); @@ -316,14 +316,12 @@ void TelemetryBidiReactor::close() { } context_.TryCancel(); - { - // Acquire state lock + // Acquire state lock + while (StreamState::Closed != state_) { absl::MutexLock lk(&state_mtx_); - while (StreamState::Closed != state_) { - if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) { - SPDLOG_WARN("StreamState CondVar timed out before getting signalled: state={}", - static_cast<uint8_t>(state_)); - } + if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) { + SPDLOG_WARN("StreamState CondVar timed out before getting signalled: state={}", + static_cast<uint8_t>(state_)); } } } @@ -338,7 +336,7 @@ void TelemetryBidiReactor::close() { void TelemetryBidiReactor::OnDone(const grpc::Status& status) { SPDLOG_DEBUG("{}#OnDone, status.ok={}", peer_address_, status.ok()); if (!status.ok()) { - SPDLOG_WARN("{}#OnDone, status.error_code={}, status.error_message={}, status.error_details={}", peer_address_, + SPDLOG_DEBUG("{}#OnDone, status.error_code={}, status.error_message={}, status.error_details={}", peer_address_, status.error_code(), status.error_message(), status.error_details()); } { diff --git a/cpp/source/rocketmq/ClientImpl.cpp b/cpp/source/rocketmq/ClientImpl.cpp index a4026cf6..7f91b048 100644 --- a/cpp/source/rocketmq/ClientImpl.cpp +++ b/cpp/source/rocketmq/ClientImpl.cpp @@ -175,6 +175,8 @@ void ClientImpl::start() { } }; + // refer java sdk: set refresh interval to 5 minutes + // org.apache.rocketmq.client.java.impl.ClientSessionImpl#syncSettings0 telemetry_handle_ = client_manager_->getScheduler()->schedule( telemetry_functor, TELEMETRY_TASK_NAME, std::chrono::minutes(5), std::chrono::minutes(5)); @@ -401,8 +403,8 @@ void ClientImpl::heartbeat() { } SPDLOG_DEBUG("Heartbeat to {} OK", target); }; - client_manager_->heartbeat(target, metadata, request, absl::ToChronoMilliseconds(client_config_.request_timeout), - callback); + client_manager_->heartbeat(target, metadata, request, + absl::ToChronoMilliseconds(client_config_.request_timeout), callback); } } diff --git a/cpp/source/rocketmq/SimpleConsumer.cpp b/cpp/source/rocketmq/SimpleConsumer.cpp index a48a0e49..5ab92f4b 100644 --- a/cpp/source/rocketmq/SimpleConsumer.cpp +++ b/cpp/source/rocketmq/SimpleConsumer.cpp @@ -19,6 +19,7 @@ #include "SimpleConsumerImpl.h" #include "StaticNameServerResolver.h" +#include "rocketmq/ErrorCode.h" ROCKETMQ_NAMESPACE_BEGIN @@ -58,7 +59,7 @@ void SimpleConsumer::receive(std::size_t limit, auto callback = [&, mtx, cv](const std::error_code& code, const std::vector<MessageConstSharedPtr>& result) { { absl::MutexLock lk(mtx.get()); - if (code) { + if (code && code != ErrorCode::NoContent) { ec = code; SPDLOG_WARN("Failed to receive message. Cause: {}", code.message()); } diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp b/cpp/source/rocketmq/SimpleConsumerImpl.cpp index 7a1b3edf..df060793 100644 --- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp +++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp @@ -94,8 +94,14 @@ void SimpleConsumerImpl::start() { simple_consumer->refreshAssignments0(); } }; - refresh_assignment_task_ = manager()->getScheduler()->schedule(refresh_assignment_task, "RefreshAssignmentTask", - std::chrono::seconds(3), std::chrono::seconds(3)); + + // refer java sdk: set refresh interval to 30 seconds + // org.apache.rocketmq.client.java.impl.ClientImpl#startUp + refresh_assignment_task_ = manager()->getScheduler()->schedule( + refresh_assignment_task, "RefreshAssignmentTask", + std::chrono::minutes(5), std::chrono::seconds(5)); + + client_manager_->addClientObserver(shared_from_this()); } } @@ -307,12 +313,21 @@ void SimpleConsumerImpl::receive(std::size_t limit, request.set_auto_renew(false); request.mutable_group()->CopyFrom(config().subscriber.group); request.mutable_message_queue()->CopyFrom(assignment.message_queue()); - request.set_batch_size(limit); + request.set_batch_size((int32_t) limit); + + request.mutable_filter_expression()->set_type(rmq::FilterType::TAG); + request.mutable_filter_expression()->set_expression("*"); - auto duration = google::protobuf::util::TimeUtil::MillisecondsToDuration(invisible_duration.count()); + auto invisible_duration_request = + google::protobuf::util::TimeUtil::MillisecondsToDuration(invisible_duration.count()); + request.mutable_invisible_duration()->set_nanos(invisible_duration_request.nanos()); + request.mutable_invisible_duration()->set_seconds(invisible_duration_request.seconds()); - request.mutable_invisible_duration()->set_nanos(duration.nanos()); - request.mutable_invisible_duration()->set_seconds(duration.seconds()); + auto await_duration_request = + google::protobuf::util::TimeUtil::MillisecondsToDuration( + MixAll::millisecondsOf(long_polling_duration_)); + request.mutable_long_polling_timeout()->set_nanos(await_duration_request.nanos()); + request.mutable_long_polling_timeout()->set_seconds(await_duration_request.seconds()); auto cb = [callback](const std::error_code& ec, const ReceiveMessageResult& result) { std::vector<MessageConstSharedPtr> messages; @@ -324,7 +339,6 @@ void SimpleConsumerImpl::receive(std::size_t limit, callback(ec, result.messages); }; - SPDLOG_DEBUG("ReceiveMessage.polling_timeout: {}ms", MixAll::millisecondsOf(long_polling_duration_)); manager()->receiveMessage(target, metadata, request, long_polling_duration_ + absl::ToChronoMilliseconds(requestTimeout()), cb); }