This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d3cdf73 fix(rust): fix session life cycle (#646)
2d3cdf73 is described below

commit 2d3cdf735e27a3c3a9e1786d0ee7c8318ba1578c
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Wed Dec 6 19:35:13 2023 +0800

    fix(rust): fix session life cycle (#646)
    
    * fix(rust): fix session life cycle
    
    Signed-off-by: SSpirits <ad...@lv5.moe>
    
    * fix(rust): fix session life cycle
    
    Signed-off-by: SSpirits <ad...@lv5.moe>
    
    ---------
    
    Signed-off-by: SSpirits <ad...@lv5.moe>
---
 rust/src/client.rs  |  4 ++--
 rust/src/session.rs | 13 +++++++------
 2 files changed, 9 insertions(+), 8 deletions(-)

diff --git a/rust/src/client.rs b/rust/src/client.rs
index 69000be2..91bd3692 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -190,14 +190,14 @@ impl Client {
                     },
                     command = telemetry_command_rx.recv() => {
                         if let Some(command) = command {
-                            let result = 
Self::handle_telemetry_command(rpc_client.clone(), &transaction_checker, 
endpoints.clone(), command).await;
+                            let result = 
Self::handle_telemetry_command(rpc_client.shadow_session(), 
&transaction_checker, endpoints.clone(), command).await;
                             if let Err(error) = result {
                                 error!(logger, "handle telemetry command 
failed: {:?}", error);
                             }
                         }
                     },
                     _ = &mut shutdown_rx => {
-                        debug!(logger, "receive shutdown signal, stop 
heartbeat task and telemetry command handler");
+                        info!(logger, "receive shutdown signal, stop heartbeat 
task and telemetry command handler");
                         break;
                     }
                 }
diff --git a/rust/src/session.rs b/rust/src/session.rs
index d54894d8..c69e9ee9 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -102,8 +102,8 @@ pub(crate) struct Session {
     shutdown_tx: Option<oneshot::Sender<()>>,
 }
 
-impl Clone for Session {
-    fn clone(&self) -> Self {
+impl Session {
+    pub(crate) fn shadow_session(&self) -> Self {
         Session {
             logger: self.logger.clone(),
             client_id: self.client_id.clone(),
@@ -580,7 +580,7 @@ impl SessionManager {
         let mut session_map = self.session_map.lock().await;
         let endpoint_url = endpoints.endpoint_url().to_string();
         return if session_map.contains_key(&endpoint_url) {
-            Ok(session_map.get(&endpoint_url).unwrap().clone())
+            Ok(session_map.get(&endpoint_url).unwrap().shadow_session())
         } else {
             let mut session = Session::new(
                 &self.logger,
@@ -590,8 +590,9 @@ impl SessionManager {
             )
             .await?;
             session.start(settings, telemetry_command_tx).await?;
-            session_map.insert(endpoint_url.clone(), session.clone());
-            Ok(session)
+            let shadow_session = session.shadow_session();
+            session_map.insert(endpoint_url.clone(), session);
+            Ok(shadow_session)
         };
     }
 
@@ -599,7 +600,7 @@ impl SessionManager {
         let session_map = self.session_map.lock().await;
         let mut sessions = Vec::new();
         for (_, session) in session_map.iter() {
-            sessions.push(session.clone());
+            sessions.push(session.shadow_session());
         }
         Ok(sessions)
     }

Reply via email to