This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new fee5a006 [ISSUE #1043] [Rust] Improve message status and timestamp 
handling (#1044)
fee5a006 is described below

commit fee5a0068fe010db9114a2298d70b5b53d91eecd
Author: Adam Basfop Cavendish <[email protected]>
AuthorDate: Thu Jul 31 11:28:10 2025 +0800

    [ISSUE #1043] [Rust] Improve message status and timestamp handling (#1044)
    
    - Add handle_receive_message_status() function to properly handle different 
status codes
    - Implement proper delivery timestamp processing for received messages
    - Update client.rs and push_consumer.rs to use new status handling logic
    - Add comprehensive status code mapping with appropriate error types
    - Include MessageNotFound as valid status for receive operations
    - Add unit tests for new status handling functionality
---
 rust/src/client.rs        |  40 ++++++---
 rust/src/push_consumer.rs |  23 +++--
 rust/src/util.rs          | 217 ++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 261 insertions(+), 19 deletions(-)

diff --git a/rust/src/client.rs b/rust/src/client.rs
index 39686821..0faefcb4 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -36,15 +36,15 @@ use crate::model::message::AckMessageEntry;
 use crate::pb;
 use crate::pb::receive_message_response::Content;
 use crate::pb::{
-    AckMessageRequest, AckMessageResultEntry, ChangeInvisibleDurationRequest, 
Code,
-    FilterExpression, HeartbeatRequest, HeartbeatResponse, Message, 
MessageQueue,
-    NotifyClientTerminationRequest, QueryRouteRequest, ReceiveMessageRequest, 
Resource,
-    SendMessageRequest, TelemetryCommand,
+    AckMessageRequest, AckMessageResultEntry, ChangeInvisibleDurationRequest, 
FilterExpression,
+    HeartbeatRequest, HeartbeatResponse, Message, MessageQueue, 
NotifyClientTerminationRequest,
+    QueryRouteRequest, ReceiveMessageRequest, Resource, SendMessageRequest, 
Status,
+    TelemetryCommand,
 };
 use crate::session::RPCClient;
 #[double]
 use crate::session::Session;
-use crate::util::{handle_response_status, select_message_queue};
+use crate::util::{handle_receive_message_status, handle_response_status, 
select_message_queue};
 
 #[derive(Debug)]
 pub(crate) struct Client {
@@ -400,20 +400,34 @@ impl Client {
         let responses = rpc_client.receive_message(request).await?;
 
         let mut messages = Vec::with_capacity(batch_size as usize);
+        let mut status: Option<Status> = None;
+        let mut delivery_timestamp: Option<prost_types::Timestamp> = None;
+
         for response in responses {
             match response.content.unwrap() {
-                Content::Status(status) => {
-                    if status.code() == Code::MessageNotFound {
-                        return Ok(vec![]);
-                    }
-                    handle_response_status(Some(status), 
OPERATION_RECEIVE_MESSAGE)?;
+                Content::Status(response_status) => {
+                    status = Some(response_status);
                 }
                 Content::Message(message) => {
                     messages.push(message);
                 }
-                Content::DeliveryTimestamp(_) => {}
+                Content::DeliveryTimestamp(timestamp) => {
+                    delivery_timestamp = Some(timestamp);
+                }
             }
         }
+
+        if let Some(status) = status {
+            handle_receive_message_status(&status, OPERATION_RECEIVE_MESSAGE)?;
+        }
+        if let Some(ref delivery_timestamp) = delivery_timestamp {
+            for message in &mut messages {
+                if let Some(system_properties) = 
message.system_properties.as_mut() {
+                    system_properties.delivery_timestamp = 
Some(delivery_timestamp.clone());
+                }
+            }
+        }
+
         Ok(messages)
     }
 
@@ -1146,8 +1160,8 @@ pub(crate) mod tests {
         assert!(receive_result.is_err());
 
         let error = receive_result.unwrap_err();
-        assert_eq!(error.kind, ErrorKind::Server);
-        assert_eq!(error.message, "server return an error");
+        assert_eq!(error.kind, ErrorKind::Config);
+        assert_eq!(error.message, "bad request");
         assert_eq!(error.operation, "client.receive_message");
     }
 
diff --git a/rust/src/push_consumer.rs b/rust/src/push_consumer.rs
index d3f9a23a..8672a4a3 100644
--- a/rust/src/push_consumer.rs
+++ b/rust/src/push_consumer.rs
@@ -37,13 +37,14 @@ use crate::pb::receive_message_response::Content;
 use crate::pb::{
     AckMessageRequest, Assignment, ChangeInvisibleDurationRequest,
     ForwardMessageToDeadLetterQueueRequest, QueryAssignmentRequest, 
ReceiveMessageRequest,
-    Resource,
+    Resource, Status,
 };
 use crate::session::RPCClient;
 #[double]
 use crate::session::Session;
 use crate::util::{
-    build_endpoints_by_message_queue, build_push_consumer_settings, 
handle_response_status,
+    build_endpoints_by_message_queue, build_push_consumer_settings, 
handle_receive_message_status,
+    handle_response_status,
 };
 use tracing::{debug, error, info, warn};
 
@@ -309,15 +310,20 @@ impl PushConsumer {
             &message_queue.to_pb_message_queue(),
             OPERATION_RECEIVE_MESSAGE,
         )?;
+        let mut status: Option<Status> = None;
+        let mut _delivery_timestamp: Option<prost_types::Timestamp> = None;
+
         for response in responses {
             if response.content.is_some() {
                 let content = response.content.unwrap();
                 match content {
-                    Content::Status(status) => {
-                        warn!("unhandled status message {:?}", status);
+                    Content::Status(response_status) => {
+                        // Store the status for later processing
+                        status = Some(response_status);
                     }
-                    Content::DeliveryTimestamp(_) => {
-                        warn!("unhandled delivery timestamp message");
+                    Content::DeliveryTimestamp(timestamp) => {
+                        // Store the delivery timestamp for later use
+                        _delivery_timestamp = Some(timestamp);
                     }
                     Content::Message(message) => {
                         messages.push(
@@ -333,6 +339,11 @@ impl PushConsumer {
                 }
             }
         }
+
+        // Handle the status message properly
+        if let Some(status) = status {
+            handle_receive_message_status(&status, OPERATION_RECEIVE_MESSAGE)?;
+        }
         Ok(messages)
     }
 }
diff --git a/rust/src/util.rs b/rust/src/util.rs
index a7102799..e0312b13 100644
--- a/rust/src/util.rs
+++ b/rust/src/util.rs
@@ -223,6 +223,136 @@ pub fn handle_response_status(
     Ok(())
 }
 
+/// Handle status messages in receive message responses, similar to Java 
StatusChecker
+/// This function handles status codes appropriately based on the context
+pub fn handle_receive_message_status(
+    status: &Status,
+    operation: &'static str,
+) -> Result<(), ClientError> {
+    let code = match Code::from_i32(status.code) {
+        Some(code) => code,
+        None => {
+            // Handle unrecognized status codes.
+            tracing::warn!(
+                "Unrecognized status code={}, statusMessage={}, operation={}",
+                status.code,
+                status.message,
+                operation
+            );
+            return Err(
+                ClientError::new(ErrorKind::Server, "unsupported status code", 
operation)
+                    .with_context("code", format!("{}", status.code))
+                    .with_context("message", status.message.clone()),
+            );
+        }
+    };
+
+    match code {
+        // Unused, unrecognized status codes
+        Code::Unspecified
+        | Code::PreconditionFailed
+        | Code::NotImplemented
+        | Code::FailedToConsumeMessage => {
+            Err(
+                ClientError::new(ErrorKind::Server, "unsupported status code", 
operation)
+                    .with_context("code", format!("{}", status.code))
+                    .with_context("message", status.message.clone()),
+            )
+        }
+
+        // OK and MULTIPLE_RESULTS are acceptable for receive message
+        Code::Ok | Code::MultipleResults => Ok(()),
+        // MESSAGE_NOT_FOUND is acceptable for receive message - no messages 
available
+        // This is not an error, just indicates no new messages
+        Code::MessageNotFound => Ok(()),
+
+        Code::BadRequest
+        | Code::IllegalAccessPoint
+        | Code::IllegalTopic
+        | Code::IllegalConsumerGroup
+        | Code::IllegalMessageTag
+        | Code::IllegalMessageKey
+        | Code::IllegalMessageGroup
+        | Code::IllegalMessagePropertyKey
+        | Code::InvalidTransactionId
+        | Code::IllegalMessageId
+        | Code::IllegalFilterExpression
+        | Code::IllegalInvisibleTime
+        | Code::IllegalDeliveryTime
+        | Code::InvalidReceiptHandle
+        | Code::MessagePropertyConflictWithType
+        | Code::UnrecognizedClientType
+        | Code::MessageCorrupted
+        | Code::ClientIdRequired
+        | Code::IllegalPollingTime => {
+            Err(
+                ClientError::new(ErrorKind::Config, "bad request", operation)
+                    .with_context("code", format!("{}", status.code))
+                    .with_context("message", status.message.clone()),
+            )
+        }
+        Code::Unauthorized => Err(
+            ClientError::new(ErrorKind::Server, "unauthorized", operation)
+                .with_context("code", format!("{}", status.code))
+                .with_context("message", status.message.clone()),
+        ),
+        Code::PaymentRequired => {
+            Err(
+                ClientError::new(ErrorKind::Server, "payment required", 
operation)
+                    .with_context("code", format!("{}", status.code))
+                    .with_context("message", status.message.clone()),
+            )
+        }
+        Code::Forbidden => Err(ClientError::new(ErrorKind::Server, 
"forbidden", operation)
+            .with_context("code", format!("{}", status.code))
+            .with_context("message", status.message.clone())),
+        Code::NotFound | Code::TopicNotFound | Code::ConsumerGroupNotFound => {
+            Err(ClientError::new(ErrorKind::Server, "not found", operation)
+                .with_context("code", format!("{}", status.code))
+                .with_context("message", status.message.clone()))
+        }
+        Code::PayloadTooLarge | Code::MessageBodyTooLarge => {
+            Err(
+                ClientError::new(ErrorKind::Server, "payload too large", 
operation)
+                    .with_context("code", format!("{}", status.code))
+                    .with_context("message", status.message.clone()),
+            )
+        }
+        Code::TooManyRequests => {
+            Err(
+                ClientError::new(ErrorKind::Server, "too many requests", 
operation)
+                    .with_context("code", format!("{}", status.code))
+                    .with_context("message", status.message.clone()),
+            )
+        }
+        Code::RequestHeaderFieldsTooLarge | Code::MessagePropertiesTooLarge => 
{
+            Err(ClientError::new(
+                ErrorKind::Server,
+                "request header fields too large",
+                operation,
+            )
+            .with_context("code", format!("{}", status.code))
+            .with_context("message", status.message.clone()))
+        }
+        Code::InternalError | Code::InternalServerError | Code::HaNotAvailable 
=> Err(
+            ClientError::new(ErrorKind::Server, "internal error", operation)
+                .with_context("code", format!("{}", status.code))
+                .with_context("message", status.message.clone()),
+        ),
+        Code::RequestTimeout
+        | Code::ProxyTimeout
+        | Code::MasterPersistenceTimeout
+        | Code::SlavePersistenceTimeout => {
+            Err(ClientError::new(ErrorKind::Server, "timeout", operation))
+        }
+        Code::Unsupported | Code::VersionUnsupported | 
Code::VerifyFifoMessageUnsupported => Err(
+            ClientError::new(ErrorKind::Server, "unsupported", operation)
+                .with_context("code", format!("{}", status.code))
+                .with_context("message", status.message.clone()),
+        ),
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use std::sync::atomic::AtomicUsize;
@@ -388,4 +518,91 @@ mod tests {
         );
         assert!(result.is_ok(), "should not return error when status is Ok");
     }
+
+    #[test]
+    fn test_handle_receive_message_status() {
+        // Test OK status
+        let result = handle_receive_message_status(
+            &Status {
+                code: Code::Ok as i32,
+                message: "OK".to_string(),
+            },
+            "test",
+        );
+        assert!(result.is_ok(), "should not return error when status is Ok");
+
+        // Test MultipleResults status
+        let result = handle_receive_message_status(
+            &Status {
+                code: Code::MultipleResults as i32,
+                message: "Multiple results".to_string(),
+            },
+            "test",
+        );
+        assert!(
+            result.is_ok(),
+            "should not return error when status is MultipleResults"
+        );
+
+        // Test MessageNotFound status - should be OK for receive message
+        let result = handle_receive_message_status(
+            &Status {
+                code: Code::MessageNotFound as i32,
+                message: "no new message".to_string(),
+            },
+            "test",
+        );
+        assert!(
+            result.is_ok(),
+            "should not return error when status is MessageNotFound"
+        );
+
+        // Test BadRequest status
+        let result = handle_receive_message_status(
+            &Status {
+                code: Code::BadRequest as i32,
+                message: "bad request".to_string(),
+            },
+            "test",
+        );
+        assert!(
+            result.is_err(),
+            "should return error when status is BadRequest"
+        );
+        let result = result.unwrap_err();
+        assert_eq!(result.kind, ErrorKind::Config);
+        assert_eq!(result.message, "bad request");
+
+        // Test NotFound status
+        let result = handle_receive_message_status(
+            &Status {
+                code: Code::NotFound as i32,
+                message: "not found".to_string(),
+            },
+            "test",
+        );
+        assert!(
+            result.is_err(),
+            "should return error when status is NotFound"
+        );
+        let result = result.unwrap_err();
+        assert_eq!(result.kind, ErrorKind::Server);
+        assert_eq!(result.message, "not found");
+
+        // Test Unauthorized status
+        let result = handle_receive_message_status(
+            &Status {
+                code: Code::Unauthorized as i32,
+                message: "unauthorized".to_string(),
+            },
+            "test",
+        );
+        assert!(
+            result.is_err(),
+            "should return error when status is Unauthorized"
+        );
+        let result = result.unwrap_err();
+        assert_eq!(result.kind, ErrorKind::Server);
+        assert_eq!(result.message, "unauthorized");
+    }
 }

Reply via email to