This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 65b70ed7 [rust]support message type check on rust client (#658)
65b70ed7 is described below

commit 65b70ed73259e8667e75a94e5162f75e91b4bf1b
Author: Qiping Luo <rocky55...@163.com>
AuthorDate: Thu Jan 25 14:51:24 2024 +0800

    [rust]support message type check on rust client (#658)
    
    * support message type check on rust client
    
    * update test cases
    
    * simplify MessageType conversion
    
    * remove unused function
    
    * fix code fmt issue
    
    * fix cargo fmt issue
    
    * Use strong type in application code and primitive type for wire data only
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    ---------
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    Co-authored-by: Li Zhanhui <lizhan...@gmail.com>
---
 rust/src/error.rs         |  3 +++
 rust/src/model/message.rs | 29 +++++++++++++++++++++++++++++
 rust/src/producer.rs      | 46 +++++++++++++++++++++++++++++++---------------
 3 files changed, 63 insertions(+), 15 deletions(-)

diff --git a/rust/src/error.rs b/rust/src/error.rs
index 59d1eeb4..5210842e 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -33,6 +33,9 @@ pub enum ErrorKind {
     #[error("Message is invalid")]
     InvalidMessage,
 
+    #[error("Message type not match with topic accept message type")]
+    MessageTypeNotMatch,
+
     #[error("Server error")]
     Server,
 
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
index 28c232e8..1defcd89 100644
--- a/rust/src/model/message.rs
+++ b/rust/src/model/message.rs
@@ -21,9 +21,18 @@ use std::collections::HashMap;
 
 use crate::error::{ClientError, ErrorKind};
 use crate::model::common::Endpoints;
+use crate::model::message::MessageType::{DELAY, FIFO, NORMAL, TRANSACTION};
 use crate::model::message_id::UNIQ_ID_GENERATOR;
 use crate::pb;
 
+#[derive(Clone, Copy, Debug)]
+pub enum MessageType {
+    NORMAL = 1,
+    FIFO = 2,
+    DELAY = 3,
+    TRANSACTION = 4,
+}
+
 /// [`Message`] is the data model for sending.
 pub trait Message {
     fn take_message_id(&mut self) -> String;
@@ -35,6 +44,17 @@ pub trait Message {
     fn take_message_group(&mut self) -> Option<String>;
     fn take_delivery_timestamp(&mut self) -> Option<i64>;
     fn transaction_enabled(&mut self) -> bool;
+    fn get_message_type(&self) -> MessageType;
+}
+
+pub trait MessageTypeAware {
+    fn accept_type(&self, message_type: MessageType) -> bool;
+}
+
+impl MessageTypeAware for pb::MessageQueue {
+    fn accept_type(&self, message_type: MessageType) -> bool {
+        self.accept_message_types.contains(&(message_type as i32))
+    }
 }
 
 pub(crate) struct MessageImpl {
@@ -47,6 +67,7 @@ pub(crate) struct MessageImpl {
     pub(crate) message_group: Option<String>,
     pub(crate) delivery_timestamp: Option<i64>,
     pub(crate) transaction_enabled: bool,
+    pub(crate) message_type: MessageType,
 }
 
 impl Message for MessageImpl {
@@ -85,6 +106,10 @@ impl Message for MessageImpl {
     fn transaction_enabled(&mut self) -> bool {
         self.transaction_enabled
     }
+
+    fn get_message_type(&self) -> MessageType {
+        self.message_type
+    }
 }
 
 /// [`MessageBuilder`] is the builder for [`Message`].
@@ -108,6 +133,7 @@ impl MessageBuilder {
                 message_group: None,
                 delivery_timestamp: None,
                 transaction_enabled: false,
+                message_type: NORMAL,
             },
         }
     }
@@ -135,6 +161,7 @@ impl MessageBuilder {
                 message_group: Some(message_group.into()),
                 delivery_timestamp: None,
                 transaction_enabled: false,
+                message_type: FIFO,
             },
         }
     }
@@ -162,6 +189,7 @@ impl MessageBuilder {
                 message_group: None,
                 delivery_timestamp: Some(delay_time),
                 transaction_enabled: false,
+                message_type: DELAY,
             },
         }
     }
@@ -184,6 +212,7 @@ impl MessageBuilder {
                 message_group: None,
                 delivery_timestamp: None,
                 transaction_enabled: true,
+                message_type: TRANSACTION,
             },
         }
     }
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index 7e3f3996..2a69f079 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -26,9 +26,9 @@ use crate::client::Client;
 use crate::conf::{ClientOption, ProducerOption};
 use crate::error::{ClientError, ErrorKind};
 use crate::model::common::{ClientType, SendReceipt};
-use crate::model::message;
+use crate::model::message::{self, MessageTypeAware};
 use crate::model::transaction::{Transaction, TransactionChecker, 
TransactionImpl};
-use crate::pb::{Encoding, MessageType, Resource, SystemProperties};
+use crate::pb::{Encoding, Resource, SystemProperties};
 use crate::util::{
     build_endpoints_by_message_queue, build_producer_settings, 
select_message_queue,
     select_message_queue_by_message_group, HOST_NAME,
@@ -172,20 +172,16 @@ impl Producer {
                 .take_delivery_timestamp()
                 .map(|seconds| Timestamp { seconds, nanos: 0 });
 
-            let message_type = if message.transaction_enabled() {
+            if message.transaction_enabled() {
                 message_group = None;
                 delivery_timestamp = None;
-                MessageType::Transaction as i32
             } else if delivery_timestamp.is_some() {
                 message_group = None;
-                MessageType::Delay as i32
             } else if message_group.is_some() {
                 delivery_timestamp = None;
-                MessageType::Fifo as i32
-            } else {
-                MessageType::Normal as i32
             };
 
+            // TODO: use a converter trait From or TryFrom
             let pb_message = pb::Message {
                 topic: Some(Resource {
                     name: message.take_topic(),
@@ -198,7 +194,7 @@ impl Producer {
                     message_id: message.take_message_id(),
                     message_group,
                     delivery_timestamp,
-                    message_type,
+                    message_type: message.get_message_type() as i32,
                     born_host: HOST_NAME.clone(),
                     born_timestamp: born_timestamp.clone(),
                     body_digest: None,
@@ -241,6 +237,11 @@ impl Producer {
         &self,
         messages: Vec<impl message::Message>,
     ) -> Result<Vec<SendReceipt>, ClientError> {
+        let message_types = messages
+            .iter()
+            .map(|message| message.get_message_type())
+            .collect::<Vec<_>>();
+
         let (topic, message_group, mut pb_messages) =
             self.transform_messages_to_protobuf(messages)?;
 
@@ -252,6 +253,22 @@ impl Producer {
             select_message_queue(route)
         };
 
+        if self.option.validate_message_type() {
+            for message_type in message_types {
+                if !message_queue.accept_type(message_type) {
+                    return Err(ClientError::new(
+                        ErrorKind::MessageTypeNotMatch,
+                        format!(
+                            "Current message type {:?} not match with accepted 
types {:?}.",
+                            message_type, message_queue.accept_message_types
+                        )
+                        .as_str(),
+                        Self::OPERATION_SEND_MESSAGE,
+                    ));
+                }
+            }
+        }
+
         let endpoints =
             build_endpoints_by_message_queue(&message_queue, 
Self::OPERATION_SEND_MESSAGE)?;
         for message in pb_messages.iter_mut() {
@@ -298,7 +315,7 @@ mod tests {
     use crate::error::ErrorKind;
     use crate::log::terminal_logger;
     use crate::model::common::Route;
-    use crate::model::message::{MessageBuilder, MessageImpl};
+    use crate::model::message::{MessageBuilder, MessageImpl, MessageType};
     use crate::model::transaction::TransactionResolution;
     use crate::pb::{Broker, MessageQueue};
     use crate::session::Session;
@@ -424,6 +441,7 @@ mod tests {
             message_group: None,
             delivery_timestamp: None,
             transaction_enabled: false,
+            message_type: MessageType::TRANSACTION,
         }];
         let result = producer.transform_messages_to_protobuf(messages);
         assert!(result.is_err());
@@ -491,7 +509,7 @@ mod tests {
                             addresses: vec![],
                         }),
                     }),
-                    accept_message_types: vec![],
+                    accept_message_types: vec![MessageType::NORMAL as i32],
                 }],
             }))
         });
@@ -539,7 +557,7 @@ mod tests {
                             addresses: vec![],
                         }),
                     }),
-                    accept_message_types: vec![],
+                    accept_message_types: vec![MessageType::TRANSACTION as 
i32],
                 }],
             }))
         });
@@ -563,9 +581,7 @@ mod tests {
 
         let _ = producer
             .send_transaction_message(
-                MessageBuilder::builder()
-                    .set_topic("test_topic")
-                    .set_body(vec![])
+                MessageBuilder::transaction_message_builder("test_topic", 
vec![])
                     .build()
                     .unwrap(),
             )

Reply via email to