This is an automated email from the ASF dual-hosted git repository.

sspirits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 430ed16c [rust]sync client settings periodically (#691)
430ed16c is described below

commit 430ed16c13c407c56a096c599a38cad383b223dd
Author: Qiping Luo <qiping...@tencent.com>
AuthorDate: Tue Mar 12 15:54:31 2024 +0800

    [rust]sync client settings periodically (#691)
    
    * [rust]sync client settings periodically
    
    * fix ugly import
    
    * reuse existing update_settings function
    
    * simplify code
    
    * fix: avoid unwrap in main code
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: fix tests
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    ---------
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    Co-authored-by: Li Zhanhui <lizhan...@gmail.com>
---
 rust/src/client.rs            | 164 ++++++++++++++++++++++++++----------------
 rust/src/conf.rs              |  36 ++++++++++
 rust/src/model/transaction.rs |   4 +-
 rust/src/producer.rs          |  46 ++++++------
 rust/src/session.rs           |   9 +--
 rust/src/simple_consumer.rs   |  15 ++--
 rust/src/util.rs              |  41 +++++------
 7 files changed, 192 insertions(+), 123 deletions(-)

diff --git a/rust/src/client.rs b/rust/src/client.rs
index 884f98b0..4183256f 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -26,9 +26,10 @@ use parking_lot::Mutex;
 use prost_types::Duration;
 use slog::{debug, error, info, o, warn, Logger};
 use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, RwLock};
+use tokio::time::Instant;
 
-use crate::conf::ClientOption;
+use crate::conf::{ClientOption, SettingsAware};
 use crate::error::{ClientError, ErrorKind};
 use crate::model::common::{ClientType, Endpoints, Route, RouteStatus, 
SendReceipt};
 use crate::model::message::AckMessageEntry;
@@ -44,14 +45,14 @@ use crate::pb::{
 use crate::session::SessionManager;
 use crate::session::{RPCClient, Session};
 
-pub(crate) struct Client {
+pub(crate) struct Client<S> {
     logger: Logger,
     option: ClientOption,
     session_manager: Arc<SessionManager>,
     route_table: Mutex<HashMap<String /* topic */, RouteStatus>>,
     id: String,
     access_endpoints: Endpoints,
-    settings: TelemetryCommand,
+    settings: Arc<RwLock<S>>,
     telemetry_command_tx: Option<mpsc::Sender<pb::telemetry_command::Command>>,
     shutdown_tx: Option<oneshot::Sender<()>>,
 }
@@ -68,7 +69,10 @@ const OPERATION_SEND_MESSAGE: &str = "client.send_message";
 const OPERATION_RECEIVE_MESSAGE: &str = "client.receive_message";
 const OPERATION_ACK_MESSAGE: &str = "client.ack_message";
 
-impl Debug for Client {
+impl<S> Debug for Client<S>
+where
+    S: SettingsAware + 'static,
+{
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         f.debug_struct("Client")
             .field("id", &self.id)
@@ -79,11 +83,14 @@ impl Debug for Client {
 }
 
 #[automock]
-impl Client {
+impl<S> Client<S>
+where
+    S: SettingsAware + 'static + Send + Sync,
+{
     pub(crate) fn new(
         logger: &Logger,
         option: ClientOption,
-        settings: TelemetryCommand,
+        settings: Arc<RwLock<S>>,
     ) -> Result<Self, ClientError> {
         let id = Self::generate_client_id();
         let endpoints = Endpoints::from_url(option.access_url())
@@ -131,12 +138,16 @@ impl Client {
             .await
             .map_err(|error| error.with_operation(OPERATION_CLIENT_START))?;
 
+        let settings = Arc::clone(&self.settings);
         tokio::spawn(async move {
             rpc_client.is_started();
-            let mut interval = 
tokio::time::interval(std::time::Duration::from_secs(30));
+            let seconds_30 = std::time::Duration::from_secs(30);
+            let mut heartbeat_interval = tokio::time::interval(seconds_30);
+            let mut sync_settings_interval =
+                tokio::time::interval_at(Instant::now() + seconds_30, 
seconds_30);
             loop {
                 select! {
-                    _ = interval.tick() => {
+                    _ = heartbeat_interval.tick() => {
                         let sessions = 
session_manager.get_all_sessions().await;
                         if sessions.is_err() {
                             error!(
@@ -159,7 +170,7 @@ impl Client {
                                 continue;
                             }
                             let result =
-                                
Self::handle_response_status(response.unwrap().status, OPERATION_HEARTBEAT);
+                                
handle_response_status(response.unwrap().status, OPERATION_HEARTBEAT);
                             if result.is_err() {
                                 error!(
                                     logger,
@@ -171,13 +182,34 @@ impl Client {
                             debug!(logger,"send heartbeat to server success, 
peer={}",peer);
                         }
                     },
+                    _ = sync_settings_interval.tick() => {
+                        let sessions = 
session_manager.get_all_sessions().await;
+                        if sessions.is_err() {
+                            error!(logger, "sync settings failed: failed to 
get sessions: {}", sessions.unwrap_err());
+                            continue;
+                        }
+                        for mut session in sessions.unwrap() {
+                            let command;
+                            {
+                                command = 
settings.read().await.build_telemetry_command();
+                            }
+                            let peer = session.peer().to_string();
+                            let result = 
session.update_settings(command).await;
+                            if result.is_err() {
+                                error!(logger, "sync settings failed: failed 
to call rpc: {}", result.unwrap_err());
+                                continue;
+                            }
+                            debug!(logger, "sync settings success, peer = {}", 
peer);
+                        }
+
+                    },
                     _ = &mut shutdown_rx => {
-                        info!(logger, "receive shutdown signal, stop heartbeat 
task.");
+                        info!(logger, "receive shutdown signal, stop heartbeat 
and telemetry tasks.");
                         break;
                     }
                 }
             }
-            info!(logger, "heartbeat task is stopped");
+            info!(logger, "heartbeat and telemetry task were stopped");
         });
         Ok(())
     }
@@ -206,7 +238,7 @@ impl Client {
             resource_namespace: self.option.namespace.to_string(),
         });
         let response = 
rpc_client.notify_shutdown(NotifyClientTerminationRequest { group });
-        Self::handle_response_status(response.await?.status, 
OPERATION_CLIENT_SHUTDOWN)?;
+        handle_response_status(response.await?.status, 
OPERATION_CLIENT_SHUTDOWN)?;
         self.session_manager.shutdown().await;
         Ok(())
     }
@@ -234,13 +266,17 @@ impl Client {
         )
     }
 
+    async fn build_telemetry_command(&self) -> TelemetryCommand {
+        self.settings.read().await.build_telemetry_command()
+    }
+
     pub(crate) async fn get_session(&self) -> Result<Session, ClientError> {
         self.check_started(OPERATION_GET_SESSION)?;
         let session = self
             .session_manager
             .get_or_create_session(
                 &self.access_endpoints,
-                self.settings.clone(),
+                self.build_telemetry_command().await,
                 self.telemetry_command_tx.clone().unwrap(),
             )
             .await?;
@@ -255,37 +291,13 @@ impl Client {
             .session_manager
             .get_or_create_session(
                 endpoints,
-                self.settings.clone(),
+                self.build_telemetry_command().await,
                 self.telemetry_command_tx.clone().unwrap(),
             )
             .await?;
         Ok(session)
     }
 
-    pub(crate) fn handle_response_status(
-        status: Option<Status>,
-        operation: &'static str,
-    ) -> Result<(), ClientError> {
-        if status.is_none() {
-            return Err(ClientError::new(
-                ErrorKind::Server,
-                "server do not return status, this may be a bug",
-                operation,
-            ));
-        }
-
-        let status = status.unwrap();
-        let status_code = Code::from_i32(status.code).unwrap();
-        if !status_code.eq(&Code::Ok) {
-            return Err(
-                ClientError::new(ErrorKind::Server, "server return an error", 
operation)
-                    .with_context("code", status_code.as_str_name())
-                    .with_context("message", status.message),
-            );
-        }
-        Ok(())
-    }
-
     pub(crate) fn topic_route_from_cache(&self, topic: &str) -> 
Option<Arc<Route>> {
         self.route_table.lock().get(topic).and_then(|route_status| {
             if let RouteStatus::Found(route) = route_status {
@@ -325,7 +337,7 @@ impl Client {
         };
 
         let response = rpc_client.query_route(request).await?;
-        Self::handle_response_status(response.status, OPERATION_QUERY_ROUTE)?;
+        handle_response_status(response.status, OPERATION_QUERY_ROUTE)?;
 
         let route = Route {
             index: AtomicUsize::new(0),
@@ -454,7 +466,7 @@ impl Client {
     ) -> Result<Vec<SendReceipt>, ClientError> {
         let request = SendMessageRequest { messages };
         let response = rpc_client.send_message(request).await?;
-        Self::handle_response_status(response.status, OPERATION_SEND_MESSAGE)?;
+        handle_response_status(response.status, OPERATION_SEND_MESSAGE)?;
 
         Ok(response
             .entries
@@ -512,7 +524,7 @@ impl Client {
                     if status.code() == Code::MessageNotFound {
                         return Ok(vec![]);
                     }
-                    Self::handle_response_status(Some(status), 
OPERATION_RECEIVE_MESSAGE)?;
+                    handle_response_status(Some(status), 
OPERATION_RECEIVE_MESSAGE)?;
                 }
                 Content::Message(message) => {
                     messages.push(message);
@@ -560,7 +572,7 @@ impl Client {
             entries,
         };
         let response = rpc_client.ack_message(request).await?;
-        Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
+        handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
         Ok(response.entries)
     }
 
@@ -605,11 +617,31 @@ impl Client {
             message_id,
         };
         let response = rpc_client.change_invisible_duration(request).await?;
-        Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
+        handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
         Ok(response.receipt_handle)
     }
 }
 
+pub fn handle_response_status(
+    status: Option<Status>,
+    operation: &'static str,
+) -> Result<(), ClientError> {
+    let status = status.ok_or(ClientError::new(
+        ErrorKind::Server,
+        "server do not return status, this may be a bug",
+        operation,
+    ))?;
+
+    if status.code != Code::Ok as i32 {
+        return Err(
+            ClientError::new(ErrorKind::Server, "server return an error", 
operation)
+                .with_context("code", format!("{}", status.code))
+                .with_context("message", status.message),
+        );
+    }
+    Ok(())
+}
+
 #[cfg(test)]
 pub(crate) mod tests {
     use std::sync::atomic::{AtomicUsize, Ordering};
@@ -624,7 +656,6 @@ pub(crate) mod tests {
     use crate::error::{ClientError, ErrorKind};
     use crate::log::terminal_logger;
     use crate::model::common::{ClientType, Route};
-    use crate::pb::receive_message_response::Content;
     use crate::pb::{
         AckMessageEntry, AckMessageResponse, ChangeInvisibleDurationResponse, 
Code,
         FilterExpression, HeartbeatResponse, Message, MessageQueue, 
QueryRouteResponse,
@@ -637,7 +668,16 @@ pub(crate) mod tests {
     // The lock is used to prevent the mocking static function at same time 
during parallel testing.
     pub(crate) static MTX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
 
-    fn new_client_for_test() -> Client {
+    #[derive(Default)]
+    struct MockSettings {}
+
+    impl SettingsAware for MockSettings {
+        fn build_telemetry_command(&self) -> TelemetryCommand {
+            TelemetryCommand::default()
+        }
+    }
+
+    fn new_client_for_test() -> Client<MockSettings> {
         Client {
             logger: terminal_logger(),
             option: ClientOption {
@@ -646,24 +686,24 @@ pub(crate) mod tests {
             },
             session_manager: Arc::new(SessionManager::default()),
             route_table: Mutex::new(HashMap::new()),
-            id: Client::generate_client_id(),
+            id: Client::<MockSettings>::generate_client_id(),
             access_endpoints: 
Endpoints::from_url("http://localhost:8081";).unwrap(),
-            settings: TelemetryCommand::default(),
+            settings: Arc::new(RwLock::new(MockSettings::default())),
             telemetry_command_tx: None,
             shutdown_tx: None,
         }
     }
 
-    fn new_client_with_session_manager(session_manager: SessionManager) -> 
Client {
+    fn new_client_with_session_manager(session_manager: SessionManager) -> 
Client<MockSettings> {
         let (tx, _) = mpsc::channel(16);
         Client {
             logger: terminal_logger(),
             option: ClientOption::default(),
             session_manager: Arc::new(session_manager),
             route_table: Mutex::new(HashMap::new()),
-            id: Client::generate_client_id(),
+            id: Client::<MockSettings>::generate_client_id(),
             access_endpoints: 
Endpoints::from_url("http://localhost:8081";).unwrap(),
-            settings: TelemetryCommand::default(),
+            settings: Arc::new(RwLock::new(MockSettings::default())),
             telemetry_command_tx: Some(tx),
             shutdown_tx: None,
         }
@@ -684,7 +724,7 @@ pub(crate) mod tests {
         Client::new(
             &terminal_logger(),
             ClientOption::default(),
-            TelemetryCommand::default(),
+            Arc::new(RwLock::new(MockSettings::default())),
         )?;
         Ok(())
     }
@@ -728,8 +768,8 @@ pub(crate) mod tests {
     }
 
     #[test]
-    fn handle_response_status() {
-        let result = Client::handle_response_status(None, "test");
+    fn test_handle_response_status() {
+        let result = handle_response_status(None, "test");
         assert!(result.is_err(), "should return error when status is None");
         let result = result.unwrap_err();
         assert_eq!(result.kind, ErrorKind::Server);
@@ -739,7 +779,7 @@ pub(crate) mod tests {
         );
         assert_eq!(result.operation, "test");
 
-        let result = Client::handle_response_status(
+        let result = handle_response_status(
             Some(Status {
                 code: Code::BadRequest as i32,
                 message: "test failed".to_string(),
@@ -757,12 +797,12 @@ pub(crate) mod tests {
         assert_eq!(
             result.context,
             vec![
-                ("code", "BAD_REQUEST".to_string()),
+                ("code", format!("{}", Code::BadRequest as i32)),
                 ("message", "test failed".to_string()),
             ]
         );
 
-        let result = Client::handle_response_status(
+        let result = handle_response_status(
             Some(Status {
                 code: Code::Ok as i32,
                 message: "test success".to_string(),
@@ -897,9 +937,13 @@ pub(crate) mod tests {
         mock.expect_heartbeat()
             .return_once(|_| Box::pin(futures::future::ready(response)));
 
-        let send_result =
-            Client::heart_beat_inner(mock, &Some("group".to_string()), "", 
&ClientType::Producer)
-                .await;
+        let send_result = Client::<MockSettings>::heart_beat_inner(
+            mock,
+            &Some("group".to_string()),
+            "",
+            &ClientType::Producer,
+        )
+        .await;
         assert!(send_result.is_ok());
     }
 
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index fa16a41a..8b270612 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -20,10 +20,12 @@
 use std::time::Duration;
 
 use crate::model::common::ClientType;
+use crate::pb::TelemetryCommand;
 #[allow(unused_imports)]
 use crate::producer::Producer;
 #[allow(unused_imports)]
 use crate::simple_consumer::SimpleConsumer;
+use crate::util::{build_producer_settings, build_simple_consumer_settings};
 
 /// [`ClientOption`] is the configuration of internal client, which manages 
the connection and request with RocketMQ proxy.
 #[derive(Debug, Clone)]
@@ -130,6 +132,7 @@ pub struct ProducerOption {
     topics: Option<Vec<String>>,
     namespace: String,
     validate_message_type: bool,
+    timeout: Duration,
 }
 
 impl Default for ProducerOption {
@@ -140,6 +143,7 @@ impl Default for ProducerOption {
             topics: None,
             namespace: "".to_string(),
             validate_message_type: true,
+            timeout: Duration::from_secs(3),
         }
     }
 }
@@ -188,6 +192,10 @@ impl ProducerOption {
     pub fn set_validate_message_type(&mut self, validate_message_type: bool) {
         self.validate_message_type = validate_message_type;
     }
+
+    pub fn timeout(&self) -> &Duration {
+        &self.timeout
+    }
 }
 
 /// The configuration of [`SimpleConsumer`].
@@ -198,6 +206,8 @@ pub struct SimpleConsumerOption {
     prefetch_route: bool,
     topics: Option<Vec<String>>,
     namespace: String,
+    timeout: Duration,
+    long_polling_timeout: Duration,
 }
 
 impl Default for SimpleConsumerOption {
@@ -208,6 +218,8 @@ impl Default for SimpleConsumerOption {
             prefetch_route: true,
             topics: None,
             namespace: "".to_string(),
+            timeout: Duration::from_secs(3),
+            long_polling_timeout: Duration::from_secs(40),
         }
     }
 }
@@ -256,6 +268,30 @@ impl SimpleConsumerOption {
     pub(crate) fn set_namespace(&mut self, name_space: impl Into<String>) {
         self.namespace = name_space.into();
     }
+
+    pub fn timeout(&self) -> &Duration {
+        &self.timeout
+    }
+
+    pub fn long_polling_timeout(&self) -> &Duration {
+        &self.long_polling_timeout
+    }
+}
+
+pub trait SettingsAware {
+    fn build_telemetry_command(&self) -> TelemetryCommand;
+}
+
+impl SettingsAware for ProducerOption {
+    fn build_telemetry_command(&self) -> TelemetryCommand {
+        build_producer_settings(self)
+    }
+}
+
+impl SettingsAware for SimpleConsumerOption {
+    fn build_telemetry_command(&self) -> TelemetryCommand {
+        build_simple_consumer_settings(self)
+    }
 }
 
 #[cfg(test)]
diff --git a/rust/src/model/transaction.rs b/rust/src/model/transaction.rs
index 2a40d7bf..0a4c6871 100644
--- a/rust/src/model/transaction.rs
+++ b/rust/src/model/transaction.rs
@@ -21,7 +21,7 @@ use std::fmt::{Debug, Formatter};
 
 use async_trait::async_trait;
 
-use crate::client::Client;
+use crate::client::handle_response_status;
 use crate::error::ClientError;
 use crate::model::common::SendReceipt;
 use crate::model::message::MessageView;
@@ -95,7 +95,7 @@ impl TransactionImpl {
                 trace_context: "".to_string(),
             })
             .await?;
-        Client::handle_response_status(response.status, "end transaction")
+        handle_response_status(response.status, "end transaction")
     }
 }
 
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index e456cbe7..26ba496a 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -26,6 +26,7 @@ use tokio::select;
 use tokio::sync::RwLock;
 use tokio::sync::{mpsc, oneshot};
 
+use crate::client::handle_response_status;
 #[double]
 use crate::client::Client;
 use crate::conf::{ClientOption, ProducerOption};
@@ -40,8 +41,8 @@ use 
crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand, S
 use crate::pb::{Encoding, EndTransactionRequest, Resource, SystemProperties, 
TransactionSource};
 use crate::session::RPCClient;
 use crate::util::{
-    build_endpoints_by_message_queue, build_producer_settings, 
select_message_queue,
-    select_message_queue_by_message_group, HOST_NAME,
+    build_endpoints_by_message_queue, select_message_queue, 
select_message_queue_by_message_group,
+    HOST_NAME,
 };
 use crate::{log, pb};
 
@@ -54,7 +55,7 @@ use crate::{log, pb};
 pub struct Producer {
     option: Arc<RwLock<ProducerOption>>,
     logger: Logger,
-    client: Client,
+    client: Client<ProducerOption>,
     transaction_checker: Option<Box<TransactionChecker>>,
     shutdown_tx: Option<oneshot::Sender<()>>,
 }
@@ -85,9 +86,8 @@ impl Producer {
             ..client_option
         };
         let logger = log::logger(option.logging_format());
-        let settings = build_producer_settings(&option, &client_option);
-        let client = Client::new(&logger, client_option, settings)?;
         let option = Arc::new(RwLock::new(option));
+        let client = Client::new(&logger, client_option, Arc::clone(&option))?;
         Ok(Producer {
             option,
             logger,
@@ -115,9 +115,8 @@ impl Producer {
             ..client_option
         };
         let logger = log::logger(option.logging_format());
-        let settings = build_producer_settings(&option, &client_option);
-        let client = Client::new(&logger, client_option, settings)?;
         let option = Arc::new(RwLock::new(option));
+        let client = Client::<ProducerOption>::new(&logger, client_option, 
Arc::clone(&option))?;
         Ok(Producer {
             option,
             logger,
@@ -128,9 +127,7 @@ impl Producer {
     }
 
     async fn get_resource_namespace(&self) -> String {
-        let option_guard = self.option.read();
-        let resource_namespace = option_guard.await.namespace().to_string();
-        resource_namespace
+        self.option.read().await.namespace().to_string()
     }
 
     /// Start the producer
@@ -139,14 +136,15 @@ impl Producer {
         let telemetry_command_tx: mpsc::Sender<pb::telemetry_command::Command> 
=
             telemetry_command_tx;
         self.client.start(telemetry_command_tx).await?;
-        let option_guard = self.option.read().await;
-        let topics = option_guard.topics();
-        if let Some(topics) = topics {
-            for topic in topics {
-                self.client.topic_route(topic, true).await?;
+        {
+            let option_guard = self.option.read().await;
+            let topics = option_guard.topics();
+            if let Some(topics) = topics {
+                for topic in topics {
+                    self.client.topic_route(topic, true).await?;
+                }
             }
         }
-        drop(option_guard);
         let transaction_checker = self.transaction_checker.take();
         if transaction_checker.is_some() {
             self.transaction_checker = Some(Box::new(|_, _| 
TransactionResolution::UNKNOWN));
@@ -245,7 +243,7 @@ impl Producer {
                     trace_context: "".to_string(),
                 })
                 .await?;
-            Client::handle_response_status(response.status, 
Self::OPERATION_END_TRANSACTION)
+            handle_response_status(response.status, 
Self::OPERATION_END_TRANSACTION)
         } else {
             Err(ClientError::new(
                 ErrorKind::Config,
@@ -396,9 +394,7 @@ impl Producer {
             select_message_queue(route)
         };
 
-        let option_guard = self.option.read().await;
-        let validate_message_type = option_guard.validate_message_type();
-        drop(option_guard);
+        let validate_message_type = self.validate_message_type().await;
         if validate_message_type {
             for message_type in message_types {
                 if !message_queue.accept_type(message_type) {
@@ -424,6 +420,10 @@ impl Producer {
         self.client.send_message(&endpoints, pb_messages).await
     }
 
+    async fn validate_message_type(&self) -> bool {
+        self.option.read().await.validate_message_type()
+    }
+
     pub fn has_transaction_checker(&self) -> bool {
         self.transaction_checker.is_some()
     }
@@ -500,7 +500,7 @@ mod tests {
     async fn producer_start() -> Result<(), ClientError> {
         let _m = crate::client::tests::MTX.lock();
 
-        let ctx = Client::new_context();
+        let ctx = Client::<ProducerOption>::new_context();
         ctx.expect().return_once(|_, _, _| {
             let mut client = Client::default();
             client.expect_topic_route().returning(|_, _| {
@@ -533,7 +533,7 @@ mod tests {
     async fn transaction_producer_start() -> Result<(), ClientError> {
         let _m = crate::client::tests::MTX.lock();
 
-        let ctx = Client::new_context();
+        let ctx = Client::<ProducerOption>::new_context();
         ctx.expect().return_once(|_, _, _| {
             let mut client = Client::default();
             client.expect_topic_route().returning(|_, _| {
@@ -774,8 +774,6 @@ mod tests {
         mock.expect_end_transaction()
             .return_once(|_| Box::pin(futures::future::ready(response)));
 
-        let context = MockClient::handle_response_status_context();
-        context.expect().return_once(|_, _| Result::Ok(()));
         let result = Producer::handle_recover_orphaned_transaction_command(
             mock,
             pb::RecoverOrphanedTransactionCommand {
diff --git a/rust/src/session.rs b/rust/src/session.rs
index c69e9ee9..7b2643c8 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -110,7 +110,7 @@ impl Session {
             option: self.option.clone(),
             endpoints: self.endpoints.clone(),
             stub: self.stub.clone(),
-            telemetry_tx: None,
+            telemetry_tx: self.telemetry_tx.clone(),
             shutdown_tx: None,
         }
     }
@@ -686,10 +686,7 @@ mod tests {
 
         let (tx, _) = mpsc::channel(16);
         let result = session
-            .start(
-                build_producer_settings(&ProducerOption::default(), 
&ClientOption::default()),
-                tx,
-            )
+            .start(build_producer_settings(&ProducerOption::default()), tx)
             .await;
         assert!(result.is_ok());
         assert!(session.is_started());
@@ -714,7 +711,7 @@ mod tests {
         let session = session_manager
             .get_or_create_session(
                 &Endpoints::from_url(&format!("localhost:{}", 
server.address().port())).unwrap(),
-                build_producer_settings(&ProducerOption::default(), 
&client_option),
+                build_producer_settings(&ProducerOption::default()),
                 tx,
             )
             .await
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index e891d384..8d655000 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -15,12 +15,13 @@
  * limitations under the License.
  */
 
+use std::sync::Arc;
 use std::time::Duration;
 
 use mockall_double::double;
 use slog::{info, warn, Logger};
 use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, RwLock};
 
 #[double]
 use crate::client::Client;
@@ -28,9 +29,7 @@ use crate::conf::{ClientOption, SimpleConsumerOption};
 use crate::error::{ClientError, ErrorKind};
 use crate::model::common::{ClientType, FilterExpression};
 use crate::model::message::{AckMessageEntry, MessageView};
-use crate::util::{
-    build_endpoints_by_message_queue, build_simple_consumer_settings, 
select_message_queue,
-};
+use crate::util::{build_endpoints_by_message_queue, select_message_queue};
 use crate::{log, pb};
 
 /// [`SimpleConsumer`] is a lightweight consumer to consume messages from 
RocketMQ proxy.
@@ -46,7 +45,7 @@ use crate::{log, pb};
 pub struct SimpleConsumer {
     option: SimpleConsumerOption,
     logger: Logger,
-    client: Client,
+    client: Client<SimpleConsumerOption>,
     shutdown_tx: Option<oneshot::Sender<()>>,
 }
 
@@ -75,8 +74,8 @@ impl SimpleConsumer {
             ..client_option
         };
         let logger = log::logger(option.logging_format());
-        let settings = build_simple_consumer_settings(&option, &client_option);
-        let client = Client::new(&logger, client_option, settings)?;
+        let settings = Arc::new(RwLock::new(option.clone()));
+        let client = Client::<SimpleConsumerOption>::new(&logger, 
client_option, settings)?;
         Ok(SimpleConsumer {
             option,
             logger,
@@ -230,7 +229,7 @@ mod tests {
     async fn simple_consumer_start() -> Result<(), ClientError> {
         let _m = crate::client::tests::MTX.lock();
 
-        let ctx = Client::new_context();
+        let ctx = Client::<SimpleConsumerOption>::new_context();
         ctx.expect().return_once(|_, _, _| {
             let mut client = Client::default();
             client.expect_topic_route().returning(|_, _| {
diff --git a/rust/src/util.rs b/rust/src/util.rs
index ac935b6b..1e5cf11c 100644
--- a/rust/src/util.rs
+++ b/rust/src/util.rs
@@ -14,16 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-use once_cell::sync::Lazy;
 use std::hash::Hasher;
 use std::sync::atomic::Ordering;
 use std::sync::Arc;
 
-use crate::conf::{ClientOption, ProducerOption, SimpleConsumerOption};
+use once_cell::sync::Lazy;
 use siphasher::sip::SipHasher24;
 
+use crate::conf::{ProducerOption, SimpleConsumerOption};
 use crate::error::{ClientError, ErrorKind};
-use crate::model::common::{Endpoints, Route};
+use crate::model::common::{ClientType, Endpoints, Route};
 use crate::pb::settings::PubSub;
 use crate::pb::telemetry_command::Command;
 use crate::pb::{
@@ -84,10 +84,7 @@ pub(crate) fn build_endpoints_by_message_queue(
     Ok(Endpoints::from_pb_endpoints(broker.endpoints.unwrap()))
 }
 
-pub(crate) fn build_producer_settings(
-    option: &ProducerOption,
-    client_options: &ClientOption,
-) -> TelemetryCommand {
+pub(crate) fn build_producer_settings(option: &ProducerOption) -> 
TelemetryCommand {
     let topics = option
         .topics()
         .clone()
@@ -101,10 +98,10 @@ pub(crate) fn build_producer_settings(
     let platform = os_info::get();
     TelemetryCommand {
         command: Some(Command::Settings(Settings {
-            client_type: Some(client_options.client_type.clone() as i32),
+            client_type: Some(ClientType::Producer as i32),
             request_timeout: Some(prost_types::Duration {
-                seconds: client_options.timeout().as_secs() as i64,
-                nanos: client_options.timeout().subsec_nanos() as i32,
+                seconds: option.timeout().as_secs() as i64,
+                nanos: option.timeout().subsec_nanos() as i32,
             }),
             pub_sub: Some(PubSub::Publishing(Publishing {
                 topics,
@@ -123,17 +120,14 @@ pub(crate) fn build_producer_settings(
     }
 }
 
-pub(crate) fn build_simple_consumer_settings(
-    option: &SimpleConsumerOption,
-    client_option: &ClientOption,
-) -> TelemetryCommand {
+pub(crate) fn build_simple_consumer_settings(option: &SimpleConsumerOption) -> 
TelemetryCommand {
     let platform = os_info::get();
     TelemetryCommand {
         command: Some(Command::Settings(Settings {
-            client_type: Some(client_option.client_type.clone() as i32),
+            client_type: Some(ClientType::SimpleConsumer as i32),
             request_timeout: Some(prost_types::Duration {
-                seconds: client_option.timeout().as_secs() as i64,
-                nanos: client_option.timeout().subsec_nanos() as i32,
+                seconds: option.timeout().as_secs() as i64,
+                nanos: option.timeout().subsec_nanos() as i32,
             }),
             pub_sub: Some(PubSub::Subscription(Subscription {
                 group: Some(Resource {
@@ -144,8 +138,8 @@ pub(crate) fn build_simple_consumer_settings(
                 fifo: Some(false),
                 receive_batch_size: None,
                 long_polling_timeout: Some(prost_types::Duration {
-                    seconds: client_option.long_polling_timeout().as_secs() as 
i64,
-                    nanos: client_option.long_polling_timeout().subsec_nanos() 
as i32,
+                    seconds: option.long_polling_timeout().as_secs() as i64,
+                    nanos: option.long_polling_timeout().subsec_nanos() as i32,
                 }),
             })),
             user_agent: Some(Ua {
@@ -162,11 +156,12 @@ pub(crate) fn build_simple_consumer_settings(
 
 #[cfg(test)]
 mod tests {
+    use std::sync::atomic::AtomicUsize;
+    use std::sync::Arc;
+
     use crate::model::common::Route;
     use crate::pb;
     use crate::pb::{Broker, MessageQueue};
-    use std::sync::atomic::AtomicUsize;
-    use std::sync::Arc;
 
     use super::*;
 
@@ -272,11 +267,11 @@ mod tests {
 
     #[test]
     fn util_build_producer_settings() {
-        build_producer_settings(&ProducerOption::default(), 
&ClientOption::default());
+        build_producer_settings(&ProducerOption::default());
     }
 
     #[test]
     fn util_build_simple_consumer_settings() {
-        build_simple_consumer_settings(&SimpleConsumerOption::default(), 
&ClientOption::default());
+        build_simple_consumer_settings(&SimpleConsumerOption::default());
     }
 }

Reply via email to