This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 0149785a [rust] check route equality and keep existing route in case failure to query route from remote (#710) 0149785a is described below commit 0149785a7d11ed9d8bccca3c2049283bfedd4e7c Author: Qiping Luo <qiping...@tencent.com> AuthorDate: Mon Apr 1 14:24:46 2024 +0800 [rust] check route equality and keep existing route in case failure to query route from remote (#710) --- rust/src/client.rs | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 83 insertions(+), 5 deletions(-) diff --git a/rust/src/client.rs b/rust/src/client.rs index 4183256f..f3bcb283 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -397,10 +397,17 @@ where "query route for topic={} success: route={:?}", topic, route ); let route = Arc::new(route); - let prev = self - .route_table - .lock() - .insert(topic.to_owned(), RouteStatus::Found(Arc::clone(&route))); + let mut route_table_lock = self.route_table.lock(); + + // if message queues in previous and new route are the same, just keep the previous. + if let Some(RouteStatus::Found(prev)) = route_table_lock.get(topic) { + if prev.queue == route.queue { + return Ok(Arc::clone(prev)); + } + } + + let prev = + route_table_lock.insert(topic.to_owned(), RouteStatus::Found(Arc::clone(&route))); info!(self.logger, "update route for topic={}", topic); if let Some(RouteStatus::Querying(Some(mut v))) = prev { @@ -415,7 +422,12 @@ where self.logger, "query route for topic={} failed: error={}", topic, err ); - let prev = self.route_table.lock().remove(topic); + let mut route_table_lock = self.route_table.lock(); + // keep the existing route if error occurs. + if let Some(RouteStatus::Found(prev)) = route_table_lock.get(topic) { + return Ok(Arc::clone(prev)); + } + let prev = route_table_lock.remove(topic); if let Some(RouteStatus::Querying(Some(mut v))) = prev { for item in v.drain(..) { let _ = item.send(Err(ClientError::new( @@ -925,6 +937,72 @@ pub(crate) mod tests { awaitility::at_most(Duration::from_secs(1)).until(|| handle.is_finished()); } + #[tokio::test] + async fn client_query_existing_route_with_failed_request() { + let client = new_client_for_test(); + let message_queues = if let Ok(QueryRouteResponse { + status: _, + message_queues, + }) = new_topic_route_response() + { + message_queues + } else { + vec![] + }; + client.route_table.lock().insert( + "DefaultCluster".to_string(), + RouteStatus::Found(Arc::new(Route { + index: AtomicUsize::new(0), + queue: message_queues, + })), + ); + + let mut mock = session::MockRPCClient::new(); + mock.expect_query_route().return_once(|_| { + sleep(Duration::from_millis(200)); + Box::pin(futures::future::ready(Err(ClientError::new( + ErrorKind::Server, + "server error", + "test", + )))) + }); + + let result = client.topic_route_inner(mock, "DefaultCluster").await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn client_update_same_route() { + let client = new_client_for_test(); + + let mut mock = session::MockRPCClient::new(); + mock.expect_query_route() + .return_once(|_| Box::pin(futures::future::ready(new_topic_route_response()))); + + let result = client.topic_route_inner(mock, "DefaultCluster").await; + assert!(result.is_ok()); + + let route = result.unwrap(); + assert!(!route.queue.is_empty()); + route.index.fetch_add(1, Ordering::Relaxed); + + let topic = &route.queue[0].topic; + assert!(topic.is_some()); + + let topic = topic.clone().unwrap(); + assert_eq!(topic.name, "DefaultCluster"); + assert_eq!(topic.resource_namespace, "default"); + + mock = session::MockRPCClient::new(); + mock.expect_query_route() + .return_once(|_| Box::pin(futures::future::ready(new_topic_route_response()))); + + let result2 = client.topic_route_inner(mock, "DefaultCluster").await; + assert!(result2.is_ok()); + + let route2 = result2.unwrap(); + assert_eq!(1, route2.index.load(Ordering::Relaxed)); + } #[tokio::test] async fn client_heartbeat() { let response = Ok(HeartbeatResponse {