This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 2d3cdf73 fix(rust): fix session life cycle (#646) 2d3cdf73 is described below commit 2d3cdf735e27a3c3a9e1786d0ee7c8318ba1578c Author: SSpirits <ad...@lv5.moe> AuthorDate: Wed Dec 6 19:35:13 2023 +0800 fix(rust): fix session life cycle (#646) * fix(rust): fix session life cycle Signed-off-by: SSpirits <ad...@lv5.moe> * fix(rust): fix session life cycle Signed-off-by: SSpirits <ad...@lv5.moe> --------- Signed-off-by: SSpirits <ad...@lv5.moe> --- rust/src/client.rs | 4 ++-- rust/src/session.rs | 13 +++++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/rust/src/client.rs b/rust/src/client.rs index 69000be2..91bd3692 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -190,14 +190,14 @@ impl Client { }, command = telemetry_command_rx.recv() => { if let Some(command) = command { - let result = Self::handle_telemetry_command(rpc_client.clone(), &transaction_checker, endpoints.clone(), command).await; + let result = Self::handle_telemetry_command(rpc_client.shadow_session(), &transaction_checker, endpoints.clone(), command).await; if let Err(error) = result { error!(logger, "handle telemetry command failed: {:?}", error); } } }, _ = &mut shutdown_rx => { - debug!(logger, "receive shutdown signal, stop heartbeat task and telemetry command handler"); + info!(logger, "receive shutdown signal, stop heartbeat task and telemetry command handler"); break; } } diff --git a/rust/src/session.rs b/rust/src/session.rs index d54894d8..c69e9ee9 100644 --- a/rust/src/session.rs +++ b/rust/src/session.rs @@ -102,8 +102,8 @@ pub(crate) struct Session { shutdown_tx: Option<oneshot::Sender<()>>, } -impl Clone for Session { - fn clone(&self) -> Self { +impl Session { + pub(crate) fn shadow_session(&self) -> Self { Session { logger: self.logger.clone(), client_id: self.client_id.clone(), @@ -580,7 +580,7 @@ impl SessionManager { let mut session_map = self.session_map.lock().await; let endpoint_url = endpoints.endpoint_url().to_string(); return if session_map.contains_key(&endpoint_url) { - Ok(session_map.get(&endpoint_url).unwrap().clone()) + Ok(session_map.get(&endpoint_url).unwrap().shadow_session()) } else { let mut session = Session::new( &self.logger, @@ -590,8 +590,9 @@ impl SessionManager { ) .await?; session.start(settings, telemetry_command_tx).await?; - session_map.insert(endpoint_url.clone(), session.clone()); - Ok(session) + let shadow_session = session.shadow_session(); + session_map.insert(endpoint_url.clone(), session); + Ok(shadow_session) }; } @@ -599,7 +600,7 @@ impl SessionManager { let session_map = self.session_map.lock().await; let mut sessions = Vec::new(); for (_, session) in session_map.iter() { - sessions.push(session.clone()); + sessions.push(session.shadow_session()); } Ok(sessions) }