This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 9fec02bc feat(rust): support changing invisible duration (#623) 9fec02bc is described below commit 9fec02bcf4711355532f953b8d892d2724c92544 Author: SSpirits <ad...@lv5.moe> AuthorDate: Mon Oct 16 11:07:38 2023 +0800 feat(rust): support changing invisible duration (#623) * feat(rust): support changing invisible duration Signed-off-by: SSpirits <ad...@lv5.moe> * fix(rust): fix clippy warning Signed-off-by: SSpirits <ad...@lv5.moe> * feat(rust): configure the example branch using features Signed-off-by: SSpirits <ad...@lv5.moe> * fix(rust): fix clippy warning Signed-off-by: SSpirits <ad...@lv5.moe> --------- Signed-off-by: SSpirits <ad...@lv5.moe> --- rust/Cargo.toml | 15 ++++-- rust/examples/producer.rs | 2 +- rust/examples/simple_consumer.rs | 42 ++++++++++++++--- rust/src/client.rs | 98 ++++++++++++++++++++++++++++++++++------ rust/src/model/common.rs | 3 +- rust/src/model/message.rs | 6 +-- rust/src/model/transaction.rs | 4 +- rust/src/session.rs | 31 +++++++++++-- rust/src/simple_consumer.rs | 13 ++++++ rust/src/util.rs | 2 +- 10 files changed, 177 insertions(+), 39 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 80af0bb9..6cc106bb 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -33,11 +33,11 @@ keywords = ["rocketmq", "api", "client", "sdk", "grpc"] [dependencies] tokio = { version = "1", features = ["full"] } -tokio-rustls = {version = "0.24.0", features = ["default", "dangerous_configuration"] } -tokio-stream="0.1.12" +tokio-rustls = { version = "0.24.0", features = ["default", "dangerous_configuration"] } +tokio-stream = "0.1.12" async-trait = "0.1.68" lazy_static = "1.4" -tonic = {version = "0.9.0", features = ["tls", "default", "channel", "tls-roots"]} +tonic = { version = "0.9.0", features = ["tls", "default", "channel", "tls-roots"] } prost = "0.11.8" prost-types = "0.11.8" @@ -47,7 +47,7 @@ parking_lot = "0.12" hostname = "0.3.1" os_info = "3" -slog = {version = "2.7.0", features=["max_level_trace", "release_max_level_info"]} +slog = { version = "2.7.0", features = ["max_level_trace", "release_max_level_info"] } slog-term = "2.9.0" slog-async = "2.7.0" slog-json = "2.6.1" @@ -63,7 +63,7 @@ time = "0.3" once_cell = "1.9.0" mockall = "0.11.4" -mockall_double= "0.3.0" +mockall_double = "0.3.0" siphasher = "0.3.10" ring = "0.16.20" @@ -78,3 +78,8 @@ regex = "1.7.3" wiremock-grpc = "0.0.3-alpha2" futures = "0.3" awaitility = "0.3.0" + +[features] +default = ["example_ack"] +example_ack = [] +example_change_invisible_duration = [] \ No newline at end of file diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs index 3e818e91..7179c9d2 100644 --- a/rust/examples/producer.rs +++ b/rust/examples/producer.rs @@ -59,7 +59,7 @@ async fn main() { // shutdown the producer when you don't need it anymore. // you should shutdown it manually to gracefully stop and unregister from server let shutdown_result = producer.shutdown().await; - if shutdown_result.is_ok() { + if shutdown_result.is_err() { eprintln!( "producer shutdown failed: {:?}", shutdown_result.unwrap_err() diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs index fc1a8388..d9bb06da 100644 --- a/rust/examples/simple_consumer.rs +++ b/rust/examples/simple_consumer.rs @@ -14,6 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#[cfg(feature = "example_change_invisible_duration")] +use std::time::Duration; + use rocketmq::conf::{ClientOption, SimpleConsumerOption}; use rocketmq::model::common::{FilterExpression, FilterType}; use rocketmq::SimpleConsumer; @@ -63,14 +66,39 @@ async fn main() { for message in messages { println!("receive message: {:?}", message); - // ack message to rocketmq proxy - let ack_result = consumer.ack(&message).await; - if ack_result.is_err() { - eprintln!( - "ack message {} failed: {:?}", - message.message_id(), - ack_result.unwrap_err() + + // Do your business logic here + // And then acknowledge the message to the RocketMQ proxy if everything is okay + #[cfg(feature = "example_ack")] + { + println!("ack message {}", message.message_id()); + let ack_result = consumer.ack(&message).await; + if ack_result.is_err() { + eprintln!( + "ack message {} failed: {:?}", + message.message_id(), + ack_result.unwrap_err() + ); + } + } + + // Otherwise, you can retry this message later by changing the invisible duration + #[cfg(feature = "example_change_invisible_duration")] + { + println!( + "Delay next visible time of message {} by 10s", + message.message_id() ); + let change_invisible_duration_result = consumer + .change_invisible_duration(&message, Duration::from_secs(10)) + .await; + if change_invisible_duration_result.is_err() { + eprintln!( + "change message {} invisible duration failed: {:?}", + message.message_id(), + change_invisible_duration_result.unwrap_err() + ); + } } } diff --git a/rust/src/client.rs b/rust/src/client.rs index 3804b4ae..ae034682 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -34,12 +34,12 @@ use crate::model::message::{AckMessageEntry, MessageView}; use crate::model::transaction::{TransactionChecker, TransactionResolution}; use crate::pb; use crate::pb::receive_message_response::Content; -use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand; +use crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand, Settings}; use crate::pb::{ - AckMessageRequest, AckMessageResultEntry, Code, EndTransactionRequest, FilterExpression, - HeartbeatRequest, HeartbeatResponse, Message, MessageQueue, NotifyClientTerminationRequest, - QueryRouteRequest, ReceiveMessageRequest, Resource, SendMessageRequest, Status, - TelemetryCommand, TransactionSource, + AckMessageRequest, AckMessageResultEntry, ChangeInvisibleDurationRequest, Code, + EndTransactionRequest, FilterExpression, HeartbeatRequest, HeartbeatResponse, Message, + MessageQueue, NotifyClientTerminationRequest, QueryRouteRequest, ReceiveMessageRequest, + Resource, SendMessageRequest, Status, TelemetryCommand, TransactionSource, }; #[double] use crate::session::SessionManager; @@ -282,6 +282,7 @@ impl Client { )) } } + Settings(_) => Ok(()), _ => Err(ClientError::new( ErrorKind::Config, "receive telemetry command but there is no handler", @@ -291,7 +292,6 @@ impl Client { }; } - #[allow(dead_code)] pub(crate) fn client_id(&self) -> &str { &self.id } @@ -378,7 +378,6 @@ impl Client { }) } - #[allow(dead_code)] pub(crate) async fn topic_route( &self, topic: &str, @@ -461,8 +460,7 @@ impl Client { let result = self.query_topic_route(rpc_client, topic).await; // send result to all waiters - if result.is_ok() { - let route = result.unwrap(); + if let Ok(route) = result { debug!( self.logger, "query route for topic={} success: route={:?}", topic, route @@ -518,7 +516,6 @@ impl Client { Ok(response) } - #[allow(dead_code)] pub(crate) async fn send_message( &self, endpoints: &Endpoints, @@ -547,7 +544,6 @@ impl Client { .collect()) } - #[allow(dead_code)] pub(crate) async fn receive_message( &self, endpoints: &Endpoints, @@ -608,7 +604,6 @@ impl Client { Ok(messages) } - #[allow(dead_code)] pub(crate) async fn ack_message<T: AckMessageEntry + 'static>( &self, ack_entry: &T, @@ -649,6 +644,51 @@ impl Client { Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?; Ok(response.entries) } + + pub(crate) async fn change_invisible_duration<T: AckMessageEntry + 'static>( + &self, + ack_entry: &T, + invisible_duration: Duration, + ) -> Result<String, ClientError> { + let result = self + .change_invisible_duration_inner( + self.get_session_with_endpoints(ack_entry.endpoints()) + .await + .unwrap(), + ack_entry.topic(), + ack_entry.receipt_handle(), + invisible_duration, + ack_entry.message_id(), + ) + .await?; + Ok(result) + } + + pub(crate) async fn change_invisible_duration_inner<T: RPCClient + 'static>( + &self, + mut rpc_client: T, + topic: String, + receipt_handle: String, + invisible_duration: Duration, + message_id: String, + ) -> Result<String, ClientError> { + let request = ChangeInvisibleDurationRequest { + group: Some(Resource { + name: self.option.group.as_ref().unwrap().to_string(), + resource_namespace: self.option.namespace.to_string(), + }), + topic: Some(Resource { + name: topic, + resource_namespace: self.option.namespace.to_string(), + }), + receipt_handle, + invisible_duration: Some(invisible_duration), + message_id, + }; + let response = rpc_client.change_invisible_duration(request).await?; + Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?; + Ok(response.receipt_handle) + } } #[cfg(test)] @@ -668,9 +708,10 @@ pub(crate) mod tests { use crate::model::transaction::TransactionResolution; use crate::pb::receive_message_response::Content; use crate::pb::{ - AckMessageEntry, AckMessageResponse, Code, EndTransactionResponse, FilterExpression, - HeartbeatResponse, Message, MessageQueue, QueryRouteResponse, ReceiveMessageResponse, - Resource, SendMessageResponse, Status, SystemProperties, TelemetryCommand, + AckMessageEntry, AckMessageResponse, ChangeInvisibleDurationResponse, Code, + EndTransactionResponse, FilterExpression, HeartbeatResponse, Message, MessageQueue, + QueryRouteResponse, ReceiveMessageResponse, Resource, SendMessageResponse, Status, + SystemProperties, TelemetryCommand, }; use crate::session; @@ -1045,6 +1086,33 @@ pub(crate) mod tests { assert_eq!(ack_result.unwrap().len(), 0); } + #[tokio::test] + async fn client_change_invisible_duration() { + let response = Ok(ChangeInvisibleDurationResponse { + status: Some(Status { + code: Code::Ok as i32, + message: "Success".to_string(), + }), + receipt_handle: "receipt_handle".to_string(), + }); + let mut mock = session::MockRPCClient::new(); + mock.expect_change_invisible_duration() + .return_once(|_| Box::pin(futures::future::ready(response))); + + let client = new_client_for_test(); + let change_invisible_duration_result = client + .change_invisible_duration_inner( + mock, + "test_topic".to_string(), + "receipt_handle".to_string(), + prost_types::Duration::default(), + "message_id".to_string(), + ) + .await; + assert!(change_invisible_duration_result.is_ok()); + assert_eq!(change_invisible_duration_result.unwrap(), "receipt_handle"); + } + #[tokio::test] async fn client_ack_message_failed() { let response = Ok(AckMessageResponse { diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs index 919d4b36..5dee554c 100644 --- a/rust/src/model/common.rs +++ b/rust/src/model/common.rs @@ -27,12 +27,13 @@ use crate::error::{ClientError, ErrorKind}; use crate::pb; use crate::pb::{Address, AddressScheme, MessageQueue}; -#[allow(dead_code)] #[derive(Debug, Clone)] pub(crate) enum ClientType { Producer = 1, + #[allow(dead_code)] PushConsumer = 2, SimpleConsumer = 3, + #[allow(dead_code)] PullConsumer = 4, } diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs index a2e9405a..28c232e8 100644 --- a/rust/src/model/message.rs +++ b/rust/src/model/message.rs @@ -59,7 +59,7 @@ impl Message for MessageImpl { } fn take_body(&mut self) -> Vec<u8> { - self.body.take().unwrap_or(vec![]) + self.body.take().unwrap_or_default() } fn take_tag(&mut self) -> Option<String> { @@ -67,11 +67,11 @@ impl Message for MessageImpl { } fn take_keys(&mut self) -> Vec<String> { - self.keys.take().unwrap_or(vec![]) + self.keys.take().unwrap_or_default() } fn take_properties(&mut self) -> HashMap<String, String> { - self.properties.take().unwrap_or(HashMap::new()) + self.properties.take().unwrap_or_default() } fn take_message_group(&mut self) -> Option<String> { diff --git a/rust/src/model/transaction.rs b/rust/src/model/transaction.rs index 2f74679f..2a40d7bf 100644 --- a/rust/src/model/transaction.rs +++ b/rust/src/model/transaction.rs @@ -102,11 +102,11 @@ impl TransactionImpl { #[async_trait] impl Transaction for TransactionImpl { async fn commit(mut self) -> Result<(), ClientError> { - return self.end_transaction(TransactionResolution::COMMIT).await; + self.end_transaction(TransactionResolution::COMMIT).await } async fn rollback(mut self) -> Result<(), ClientError> { - return self.end_transaction(TransactionResolution::ROLLBACK).await; + self.end_transaction(TransactionResolution::ROLLBACK).await } fn message_id(&self) -> &str { diff --git a/rust/src/session.rs b/rust/src/session.rs index 9441f7c5..d54894d8 100644 --- a/rust/src/session.rs +++ b/rust/src/session.rs @@ -33,7 +33,8 @@ use crate::error::ErrorKind; use crate::model::common::Endpoints; use crate::pb::telemetry_command::Command; use crate::pb::{ - AckMessageRequest, AckMessageResponse, EndTransactionRequest, EndTransactionResponse, + AckMessageRequest, AckMessageResponse, ChangeInvisibleDurationRequest, + ChangeInvisibleDurationResponse, EndTransactionRequest, EndTransactionResponse, HeartbeatRequest, HeartbeatResponse, NotifyClientTerminationRequest, NotifyClientTerminationResponse, QueryRouteRequest, QueryRouteResponse, ReceiveMessageRequest, ReceiveMessageResponse, SendMessageRequest, SendMessageResponse, TelemetryCommand, @@ -49,6 +50,7 @@ const OPERATION_HEARTBEAT: &str = "rpc.heartbeat"; const OPERATION_SEND_MESSAGE: &str = "rpc.send_message"; const OPERATION_RECEIVE_MESSAGE: &str = "rpc.receive_message"; const OPERATION_ACK_MESSAGE: &str = "rpc.ack_message"; +const OPERATION_CHANGE_INVISIBLE_DURATION: &str = "rpc.change_invisible_duration"; const OPERATION_END_TRANSACTION: &str = "rpc.end_transaction"; const OPERATION_NOTIFY_CLIENT_TERMINATION: &str = "rpc.notify_client_termination"; @@ -75,6 +77,10 @@ pub(crate) trait RPCClient { &mut self, request: AckMessageRequest, ) -> Result<AckMessageResponse, ClientError>; + async fn change_invisible_duration( + &mut self, + request: ChangeInvisibleDurationRequest, + ) -> Result<ChangeInvisibleDurationResponse, ClientError>; async fn end_transaction( &mut self, request: EndTransactionRequest, @@ -85,7 +91,6 @@ pub(crate) trait RPCClient { ) -> Result<NotifyClientTerminationResponse, ClientError>; } -#[allow(dead_code)] #[derive(Debug)] pub(crate) struct Session { logger: Logger, @@ -353,7 +358,6 @@ impl Session { } } - #[allow(dead_code)] pub(crate) fn is_started(&self) -> bool { self.shutdown_tx.is_some() } @@ -489,6 +493,26 @@ impl RPCClient for Session { Ok(response.into_inner()) } + async fn change_invisible_duration( + &mut self, + request: ChangeInvisibleDurationRequest, + ) -> Result<ChangeInvisibleDurationResponse, ClientError> { + let request = self.sign(request); + let response = self + .stub + .change_invisible_duration(request) + .await + .map_err(|e| { + ClientError::new( + ErrorKind::ClientInternal, + "send rpc change_invisible_duration failed", + OPERATION_CHANGE_INVISIBLE_DURATION, + ) + .set_source(e) + })?; + Ok(response.into_inner()) + } + async fn end_transaction( &mut self, request: EndTransactionRequest, @@ -571,7 +595,6 @@ impl SessionManager { }; } - #[allow(dead_code)] pub(crate) async fn get_all_sessions(&self) -> Result<Vec<Session>, ClientError> { let session_map = self.session_map.lock().await; let mut sessions = Vec::new(); diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs index a8777053..f8a6eace 100644 --- a/rust/src/simple_consumer.rs +++ b/rust/src/simple_consumer.rs @@ -175,6 +175,19 @@ impl SimpleConsumer { self.client.ack_message(ack_entry).await?; Ok(()) } + + pub async fn change_invisible_duration( + &self, + ack_entry: &(impl AckMessageEntry + 'static), + invisible_duration: Duration, + ) -> Result<String, ClientError> { + self.client + .change_invisible_duration( + ack_entry, + prost_types::Duration::try_from(invisible_duration).unwrap(), + ) + .await + } } #[cfg(test)] diff --git a/rust/src/util.rs b/rust/src/util.rs index e25d2622..6ea92cb5 100644 --- a/rust/src/util.rs +++ b/rust/src/util.rs @@ -92,7 +92,7 @@ pub(crate) fn build_producer_settings( let topics = option .topics() .clone() - .unwrap_or(vec![]) + .unwrap_or_default() .iter() .map(|topic| Resource { name: topic.to_string(),