This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new fee5a006 [ISSUE #1043] [Rust] Improve message status and timestamp
handling (#1044)
fee5a006 is described below
commit fee5a0068fe010db9114a2298d70b5b53d91eecd
Author: Adam Basfop Cavendish <[email protected]>
AuthorDate: Thu Jul 31 11:28:10 2025 +0800
[ISSUE #1043] [Rust] Improve message status and timestamp handling (#1044)
- Add handle_receive_message_status() function to properly handle different
status codes
- Implement proper delivery timestamp processing for received messages
- Update client.rs and push_consumer.rs to use new status handling logic
- Add comprehensive status code mapping with appropriate error types
- Include MessageNotFound as valid status for receive operations
- Add unit tests for new status handling functionality
---
rust/src/client.rs | 40 ++++++---
rust/src/push_consumer.rs | 23 +++--
rust/src/util.rs | 217 ++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 261 insertions(+), 19 deletions(-)
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 39686821..0faefcb4 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -36,15 +36,15 @@ use crate::model::message::AckMessageEntry;
use crate::pb;
use crate::pb::receive_message_response::Content;
use crate::pb::{
- AckMessageRequest, AckMessageResultEntry, ChangeInvisibleDurationRequest,
Code,
- FilterExpression, HeartbeatRequest, HeartbeatResponse, Message,
MessageQueue,
- NotifyClientTerminationRequest, QueryRouteRequest, ReceiveMessageRequest,
Resource,
- SendMessageRequest, TelemetryCommand,
+ AckMessageRequest, AckMessageResultEntry, ChangeInvisibleDurationRequest,
FilterExpression,
+ HeartbeatRequest, HeartbeatResponse, Message, MessageQueue,
NotifyClientTerminationRequest,
+ QueryRouteRequest, ReceiveMessageRequest, Resource, SendMessageRequest,
Status,
+ TelemetryCommand,
};
use crate::session::RPCClient;
#[double]
use crate::session::Session;
-use crate::util::{handle_response_status, select_message_queue};
+use crate::util::{handle_receive_message_status, handle_response_status,
select_message_queue};
#[derive(Debug)]
pub(crate) struct Client {
@@ -400,20 +400,34 @@ impl Client {
let responses = rpc_client.receive_message(request).await?;
let mut messages = Vec::with_capacity(batch_size as usize);
+ let mut status: Option<Status> = None;
+ let mut delivery_timestamp: Option<prost_types::Timestamp> = None;
+
for response in responses {
match response.content.unwrap() {
- Content::Status(status) => {
- if status.code() == Code::MessageNotFound {
- return Ok(vec![]);
- }
- handle_response_status(Some(status),
OPERATION_RECEIVE_MESSAGE)?;
+ Content::Status(response_status) => {
+ status = Some(response_status);
}
Content::Message(message) => {
messages.push(message);
}
- Content::DeliveryTimestamp(_) => {}
+ Content::DeliveryTimestamp(timestamp) => {
+ delivery_timestamp = Some(timestamp);
+ }
}
}
+
+ if let Some(status) = status {
+ handle_receive_message_status(&status, OPERATION_RECEIVE_MESSAGE)?;
+ }
+ if let Some(ref delivery_timestamp) = delivery_timestamp {
+ for message in &mut messages {
+ if let Some(system_properties) =
message.system_properties.as_mut() {
+ system_properties.delivery_timestamp =
Some(delivery_timestamp.clone());
+ }
+ }
+ }
+
Ok(messages)
}
@@ -1146,8 +1160,8 @@ pub(crate) mod tests {
assert!(receive_result.is_err());
let error = receive_result.unwrap_err();
- assert_eq!(error.kind, ErrorKind::Server);
- assert_eq!(error.message, "server return an error");
+ assert_eq!(error.kind, ErrorKind::Config);
+ assert_eq!(error.message, "bad request");
assert_eq!(error.operation, "client.receive_message");
}
diff --git a/rust/src/push_consumer.rs b/rust/src/push_consumer.rs
index d3f9a23a..8672a4a3 100644
--- a/rust/src/push_consumer.rs
+++ b/rust/src/push_consumer.rs
@@ -37,13 +37,14 @@ use crate::pb::receive_message_response::Content;
use crate::pb::{
AckMessageRequest, Assignment, ChangeInvisibleDurationRequest,
ForwardMessageToDeadLetterQueueRequest, QueryAssignmentRequest,
ReceiveMessageRequest,
- Resource,
+ Resource, Status,
};
use crate::session::RPCClient;
#[double]
use crate::session::Session;
use crate::util::{
- build_endpoints_by_message_queue, build_push_consumer_settings,
handle_response_status,
+ build_endpoints_by_message_queue, build_push_consumer_settings,
handle_receive_message_status,
+ handle_response_status,
};
use tracing::{debug, error, info, warn};
@@ -309,15 +310,20 @@ impl PushConsumer {
&message_queue.to_pb_message_queue(),
OPERATION_RECEIVE_MESSAGE,
)?;
+ let mut status: Option<Status> = None;
+ let mut _delivery_timestamp: Option<prost_types::Timestamp> = None;
+
for response in responses {
if response.content.is_some() {
let content = response.content.unwrap();
match content {
- Content::Status(status) => {
- warn!("unhandled status message {:?}", status);
+ Content::Status(response_status) => {
+ // Store the status for later processing
+ status = Some(response_status);
}
- Content::DeliveryTimestamp(_) => {
- warn!("unhandled delivery timestamp message");
+ Content::DeliveryTimestamp(timestamp) => {
+ // Store the delivery timestamp for later use
+ _delivery_timestamp = Some(timestamp);
}
Content::Message(message) => {
messages.push(
@@ -333,6 +339,11 @@ impl PushConsumer {
}
}
}
+
+ // Handle the status message properly
+ if let Some(status) = status {
+ handle_receive_message_status(&status, OPERATION_RECEIVE_MESSAGE)?;
+ }
Ok(messages)
}
}
diff --git a/rust/src/util.rs b/rust/src/util.rs
index a7102799..e0312b13 100644
--- a/rust/src/util.rs
+++ b/rust/src/util.rs
@@ -223,6 +223,136 @@ pub fn handle_response_status(
Ok(())
}
+/// Handle status messages in receive message responses, similar to Java
StatusChecker
+/// This function handles status codes appropriately based on the context
+pub fn handle_receive_message_status(
+ status: &Status,
+ operation: &'static str,
+) -> Result<(), ClientError> {
+ let code = match Code::from_i32(status.code) {
+ Some(code) => code,
+ None => {
+ // Handle unrecognized status codes.
+ tracing::warn!(
+ "Unrecognized status code={}, statusMessage={}, operation={}",
+ status.code,
+ status.message,
+ operation
+ );
+ return Err(
+ ClientError::new(ErrorKind::Server, "unsupported status code",
operation)
+ .with_context("code", format!("{}", status.code))
+ .with_context("message", status.message.clone()),
+ );
+ }
+ };
+
+ match code {
+ // Unused, unrecognized status codes
+ Code::Unspecified
+ | Code::PreconditionFailed
+ | Code::NotImplemented
+ | Code::FailedToConsumeMessage => {
+ Err(
+ ClientError::new(ErrorKind::Server, "unsupported status code",
operation)
+ .with_context("code", format!("{}", status.code))
+ .with_context("message", status.message.clone()),
+ )
+ }
+
+ // OK and MULTIPLE_RESULTS are acceptable for receive message
+ Code::Ok | Code::MultipleResults => Ok(()),
+ // MESSAGE_NOT_FOUND is acceptable for receive message - no messages
available
+ // This is not an error, just indicates no new messages
+ Code::MessageNotFound => Ok(()),
+
+ Code::BadRequest
+ | Code::IllegalAccessPoint
+ | Code::IllegalTopic
+ | Code::IllegalConsumerGroup
+ | Code::IllegalMessageTag
+ | Code::IllegalMessageKey
+ | Code::IllegalMessageGroup
+ | Code::IllegalMessagePropertyKey
+ | Code::InvalidTransactionId
+ | Code::IllegalMessageId
+ | Code::IllegalFilterExpression
+ | Code::IllegalInvisibleTime
+ | Code::IllegalDeliveryTime
+ | Code::InvalidReceiptHandle
+ | Code::MessagePropertyConflictWithType
+ | Code::UnrecognizedClientType
+ | Code::MessageCorrupted
+ | Code::ClientIdRequired
+ | Code::IllegalPollingTime => {
+ Err(
+ ClientError::new(ErrorKind::Config, "bad request", operation)
+ .with_context("code", format!("{}", status.code))
+ .with_context("message", status.message.clone()),
+ )
+ }
+ Code::Unauthorized => Err(
+ ClientError::new(ErrorKind::Server, "unauthorized", operation)
+ .with_context("code", format!("{}", status.code))
+ .with_context("message", status.message.clone()),
+ ),
+ Code::PaymentRequired => {
+ Err(
+ ClientError::new(ErrorKind::Server, "payment required",
operation)
+ .with_context("code", format!("{}", status.code))
+ .with_context("message", status.message.clone()),
+ )
+ }
+ Code::Forbidden => Err(ClientError::new(ErrorKind::Server,
"forbidden", operation)
+ .with_context("code", format!("{}", status.code))
+ .with_context("message", status.message.clone())),
+ Code::NotFound | Code::TopicNotFound | Code::ConsumerGroupNotFound => {
+ Err(ClientError::new(ErrorKind::Server, "not found", operation)
+ .with_context("code", format!("{}", status.code))
+ .with_context("message", status.message.clone()))
+ }
+ Code::PayloadTooLarge | Code::MessageBodyTooLarge => {
+ Err(
+ ClientError::new(ErrorKind::Server, "payload too large",
operation)
+ .with_context("code", format!("{}", status.code))
+ .with_context("message", status.message.clone()),
+ )
+ }
+ Code::TooManyRequests => {
+ Err(
+ ClientError::new(ErrorKind::Server, "too many requests",
operation)
+ .with_context("code", format!("{}", status.code))
+ .with_context("message", status.message.clone()),
+ )
+ }
+ Code::RequestHeaderFieldsTooLarge | Code::MessagePropertiesTooLarge =>
{
+ Err(ClientError::new(
+ ErrorKind::Server,
+ "request header fields too large",
+ operation,
+ )
+ .with_context("code", format!("{}", status.code))
+ .with_context("message", status.message.clone()))
+ }
+ Code::InternalError | Code::InternalServerError | Code::HaNotAvailable
=> Err(
+ ClientError::new(ErrorKind::Server, "internal error", operation)
+ .with_context("code", format!("{}", status.code))
+ .with_context("message", status.message.clone()),
+ ),
+ Code::RequestTimeout
+ | Code::ProxyTimeout
+ | Code::MasterPersistenceTimeout
+ | Code::SlavePersistenceTimeout => {
+ Err(ClientError::new(ErrorKind::Server, "timeout", operation))
+ }
+ Code::Unsupported | Code::VersionUnsupported |
Code::VerifyFifoMessageUnsupported => Err(
+ ClientError::new(ErrorKind::Server, "unsupported", operation)
+ .with_context("code", format!("{}", status.code))
+ .with_context("message", status.message.clone()),
+ ),
+ }
+}
+
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicUsize;
@@ -388,4 +518,91 @@ mod tests {
);
assert!(result.is_ok(), "should not return error when status is Ok");
}
+
+ #[test]
+ fn test_handle_receive_message_status() {
+ // Test OK status
+ let result = handle_receive_message_status(
+ &Status {
+ code: Code::Ok as i32,
+ message: "OK".to_string(),
+ },
+ "test",
+ );
+ assert!(result.is_ok(), "should not return error when status is Ok");
+
+ // Test MultipleResults status
+ let result = handle_receive_message_status(
+ &Status {
+ code: Code::MultipleResults as i32,
+ message: "Multiple results".to_string(),
+ },
+ "test",
+ );
+ assert!(
+ result.is_ok(),
+ "should not return error when status is MultipleResults"
+ );
+
+ // Test MessageNotFound status - should be OK for receive message
+ let result = handle_receive_message_status(
+ &Status {
+ code: Code::MessageNotFound as i32,
+ message: "no new message".to_string(),
+ },
+ "test",
+ );
+ assert!(
+ result.is_ok(),
+ "should not return error when status is MessageNotFound"
+ );
+
+ // Test BadRequest status
+ let result = handle_receive_message_status(
+ &Status {
+ code: Code::BadRequest as i32,
+ message: "bad request".to_string(),
+ },
+ "test",
+ );
+ assert!(
+ result.is_err(),
+ "should return error when status is BadRequest"
+ );
+ let result = result.unwrap_err();
+ assert_eq!(result.kind, ErrorKind::Config);
+ assert_eq!(result.message, "bad request");
+
+ // Test NotFound status
+ let result = handle_receive_message_status(
+ &Status {
+ code: Code::NotFound as i32,
+ message: "not found".to_string(),
+ },
+ "test",
+ );
+ assert!(
+ result.is_err(),
+ "should return error when status is NotFound"
+ );
+ let result = result.unwrap_err();
+ assert_eq!(result.kind, ErrorKind::Server);
+ assert_eq!(result.message, "not found");
+
+ // Test Unauthorized status
+ let result = handle_receive_message_status(
+ &Status {
+ code: Code::Unauthorized as i32,
+ message: "unauthorized".to_string(),
+ },
+ "test",
+ );
+ assert!(
+ result.is_err(),
+ "should return error when status is Unauthorized"
+ );
+ let result = result.unwrap_err();
+ assert_eq!(result.kind, ErrorKind::Server);
+ assert_eq!(result.message, "unauthorized");
+ }
}