This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 65b70ed7 [rust]support message type check on rust client (#658) 65b70ed7 is described below commit 65b70ed73259e8667e75a94e5162f75e91b4bf1b Author: Qiping Luo <rocky55...@163.com> AuthorDate: Thu Jan 25 14:51:24 2024 +0800 [rust]support message type check on rust client (#658) * support message type check on rust client * update test cases * simplify MessageType conversion * remove unused function * fix code fmt issue * fix cargo fmt issue * Use strong type in application code and primitive type for wire data only Signed-off-by: Li Zhanhui <lizhan...@gmail.com> --------- Signed-off-by: Li Zhanhui <lizhan...@gmail.com> Co-authored-by: Li Zhanhui <lizhan...@gmail.com> --- rust/src/error.rs | 3 +++ rust/src/model/message.rs | 29 +++++++++++++++++++++++++++++ rust/src/producer.rs | 46 +++++++++++++++++++++++++++++++--------------- 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/rust/src/error.rs b/rust/src/error.rs index 59d1eeb4..5210842e 100644 --- a/rust/src/error.rs +++ b/rust/src/error.rs @@ -33,6 +33,9 @@ pub enum ErrorKind { #[error("Message is invalid")] InvalidMessage, + #[error("Message type not match with topic accept message type")] + MessageTypeNotMatch, + #[error("Server error")] Server, diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs index 28c232e8..1defcd89 100644 --- a/rust/src/model/message.rs +++ b/rust/src/model/message.rs @@ -21,9 +21,18 @@ use std::collections::HashMap; use crate::error::{ClientError, ErrorKind}; use crate::model::common::Endpoints; +use crate::model::message::MessageType::{DELAY, FIFO, NORMAL, TRANSACTION}; use crate::model::message_id::UNIQ_ID_GENERATOR; use crate::pb; +#[derive(Clone, Copy, Debug)] +pub enum MessageType { + NORMAL = 1, + FIFO = 2, + DELAY = 3, + TRANSACTION = 4, +} + /// [`Message`] is the data model for sending. pub trait Message { fn take_message_id(&mut self) -> String; @@ -35,6 +44,17 @@ pub trait Message { fn take_message_group(&mut self) -> Option<String>; fn take_delivery_timestamp(&mut self) -> Option<i64>; fn transaction_enabled(&mut self) -> bool; + fn get_message_type(&self) -> MessageType; +} + +pub trait MessageTypeAware { + fn accept_type(&self, message_type: MessageType) -> bool; +} + +impl MessageTypeAware for pb::MessageQueue { + fn accept_type(&self, message_type: MessageType) -> bool { + self.accept_message_types.contains(&(message_type as i32)) + } } pub(crate) struct MessageImpl { @@ -47,6 +67,7 @@ pub(crate) struct MessageImpl { pub(crate) message_group: Option<String>, pub(crate) delivery_timestamp: Option<i64>, pub(crate) transaction_enabled: bool, + pub(crate) message_type: MessageType, } impl Message for MessageImpl { @@ -85,6 +106,10 @@ impl Message for MessageImpl { fn transaction_enabled(&mut self) -> bool { self.transaction_enabled } + + fn get_message_type(&self) -> MessageType { + self.message_type + } } /// [`MessageBuilder`] is the builder for [`Message`]. @@ -108,6 +133,7 @@ impl MessageBuilder { message_group: None, delivery_timestamp: None, transaction_enabled: false, + message_type: NORMAL, }, } } @@ -135,6 +161,7 @@ impl MessageBuilder { message_group: Some(message_group.into()), delivery_timestamp: None, transaction_enabled: false, + message_type: FIFO, }, } } @@ -162,6 +189,7 @@ impl MessageBuilder { message_group: None, delivery_timestamp: Some(delay_time), transaction_enabled: false, + message_type: DELAY, }, } } @@ -184,6 +212,7 @@ impl MessageBuilder { message_group: None, delivery_timestamp: None, transaction_enabled: true, + message_type: TRANSACTION, }, } } diff --git a/rust/src/producer.rs b/rust/src/producer.rs index 7e3f3996..2a69f079 100644 --- a/rust/src/producer.rs +++ b/rust/src/producer.rs @@ -26,9 +26,9 @@ use crate::client::Client; use crate::conf::{ClientOption, ProducerOption}; use crate::error::{ClientError, ErrorKind}; use crate::model::common::{ClientType, SendReceipt}; -use crate::model::message; +use crate::model::message::{self, MessageTypeAware}; use crate::model::transaction::{Transaction, TransactionChecker, TransactionImpl}; -use crate::pb::{Encoding, MessageType, Resource, SystemProperties}; +use crate::pb::{Encoding, Resource, SystemProperties}; use crate::util::{ build_endpoints_by_message_queue, build_producer_settings, select_message_queue, select_message_queue_by_message_group, HOST_NAME, @@ -172,20 +172,16 @@ impl Producer { .take_delivery_timestamp() .map(|seconds| Timestamp { seconds, nanos: 0 }); - let message_type = if message.transaction_enabled() { + if message.transaction_enabled() { message_group = None; delivery_timestamp = None; - MessageType::Transaction as i32 } else if delivery_timestamp.is_some() { message_group = None; - MessageType::Delay as i32 } else if message_group.is_some() { delivery_timestamp = None; - MessageType::Fifo as i32 - } else { - MessageType::Normal as i32 }; + // TODO: use a converter trait From or TryFrom let pb_message = pb::Message { topic: Some(Resource { name: message.take_topic(), @@ -198,7 +194,7 @@ impl Producer { message_id: message.take_message_id(), message_group, delivery_timestamp, - message_type, + message_type: message.get_message_type() as i32, born_host: HOST_NAME.clone(), born_timestamp: born_timestamp.clone(), body_digest: None, @@ -241,6 +237,11 @@ impl Producer { &self, messages: Vec<impl message::Message>, ) -> Result<Vec<SendReceipt>, ClientError> { + let message_types = messages + .iter() + .map(|message| message.get_message_type()) + .collect::<Vec<_>>(); + let (topic, message_group, mut pb_messages) = self.transform_messages_to_protobuf(messages)?; @@ -252,6 +253,22 @@ impl Producer { select_message_queue(route) }; + if self.option.validate_message_type() { + for message_type in message_types { + if !message_queue.accept_type(message_type) { + return Err(ClientError::new( + ErrorKind::MessageTypeNotMatch, + format!( + "Current message type {:?} not match with accepted types {:?}.", + message_type, message_queue.accept_message_types + ) + .as_str(), + Self::OPERATION_SEND_MESSAGE, + )); + } + } + } + let endpoints = build_endpoints_by_message_queue(&message_queue, Self::OPERATION_SEND_MESSAGE)?; for message in pb_messages.iter_mut() { @@ -298,7 +315,7 @@ mod tests { use crate::error::ErrorKind; use crate::log::terminal_logger; use crate::model::common::Route; - use crate::model::message::{MessageBuilder, MessageImpl}; + use crate::model::message::{MessageBuilder, MessageImpl, MessageType}; use crate::model::transaction::TransactionResolution; use crate::pb::{Broker, MessageQueue}; use crate::session::Session; @@ -424,6 +441,7 @@ mod tests { message_group: None, delivery_timestamp: None, transaction_enabled: false, + message_type: MessageType::TRANSACTION, }]; let result = producer.transform_messages_to_protobuf(messages); assert!(result.is_err()); @@ -491,7 +509,7 @@ mod tests { addresses: vec![], }), }), - accept_message_types: vec![], + accept_message_types: vec![MessageType::NORMAL as i32], }], })) }); @@ -539,7 +557,7 @@ mod tests { addresses: vec![], }), }), - accept_message_types: vec![], + accept_message_types: vec![MessageType::TRANSACTION as i32], }], })) }); @@ -563,9 +581,7 @@ mod tests { let _ = producer .send_transaction_message( - MessageBuilder::builder() - .set_topic("test_topic") - .set_body(vec![]) + MessageBuilder::transaction_message_builder("test_topic", vec![]) .build() .unwrap(), )