This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 0149785a [rust] check route equality and keep existing route in case 
failure to query route from remote (#710)
0149785a is described below

commit 0149785a7d11ed9d8bccca3c2049283bfedd4e7c
Author: Qiping Luo <qiping...@tencent.com>
AuthorDate: Mon Apr 1 14:24:46 2024 +0800

    [rust] check route equality and keep existing route in case failure to 
query route from remote (#710)
---
 rust/src/client.rs | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 83 insertions(+), 5 deletions(-)

diff --git a/rust/src/client.rs b/rust/src/client.rs
index 4183256f..f3bcb283 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -397,10 +397,17 @@ where
                 "query route for topic={} success: route={:?}", topic, route
             );
             let route = Arc::new(route);
-            let prev = self
-                .route_table
-                .lock()
-                .insert(topic.to_owned(), 
RouteStatus::Found(Arc::clone(&route)));
+            let mut route_table_lock = self.route_table.lock();
+
+            // if message queues in previous and new route are the same, just 
keep the previous.
+            if let Some(RouteStatus::Found(prev)) = 
route_table_lock.get(topic) {
+                if prev.queue == route.queue {
+                    return Ok(Arc::clone(prev));
+                }
+            }
+
+            let prev =
+                route_table_lock.insert(topic.to_owned(), 
RouteStatus::Found(Arc::clone(&route)));
             info!(self.logger, "update route for topic={}", topic);
 
             if let Some(RouteStatus::Querying(Some(mut v))) = prev {
@@ -415,7 +422,12 @@ where
                 self.logger,
                 "query route for topic={} failed: error={}", topic, err
             );
-            let prev = self.route_table.lock().remove(topic);
+            let mut route_table_lock = self.route_table.lock();
+            // keep the existing route if error occurs.
+            if let Some(RouteStatus::Found(prev)) = 
route_table_lock.get(topic) {
+                return Ok(Arc::clone(prev));
+            }
+            let prev = route_table_lock.remove(topic);
             if let Some(RouteStatus::Querying(Some(mut v))) = prev {
                 for item in v.drain(..) {
                     let _ = item.send(Err(ClientError::new(
@@ -925,6 +937,72 @@ pub(crate) mod tests {
         awaitility::at_most(Duration::from_secs(1)).until(|| 
handle.is_finished());
     }
 
+    #[tokio::test]
+    async fn client_query_existing_route_with_failed_request() {
+        let client = new_client_for_test();
+        let message_queues = if let Ok(QueryRouteResponse {
+            status: _,
+            message_queues,
+        }) = new_topic_route_response()
+        {
+            message_queues
+        } else {
+            vec![]
+        };
+        client.route_table.lock().insert(
+            "DefaultCluster".to_string(),
+            RouteStatus::Found(Arc::new(Route {
+                index: AtomicUsize::new(0),
+                queue: message_queues,
+            })),
+        );
+
+        let mut mock = session::MockRPCClient::new();
+        mock.expect_query_route().return_once(|_| {
+            sleep(Duration::from_millis(200));
+            Box::pin(futures::future::ready(Err(ClientError::new(
+                ErrorKind::Server,
+                "server error",
+                "test",
+            ))))
+        });
+
+        let result = client.topic_route_inner(mock, "DefaultCluster").await;
+        assert!(result.is_ok());
+    }
+
+    #[tokio::test]
+    async fn client_update_same_route() {
+        let client = new_client_for_test();
+
+        let mut mock = session::MockRPCClient::new();
+        mock.expect_query_route()
+            .return_once(|_| 
Box::pin(futures::future::ready(new_topic_route_response())));
+
+        let result = client.topic_route_inner(mock, "DefaultCluster").await;
+        assert!(result.is_ok());
+
+        let route = result.unwrap();
+        assert!(!route.queue.is_empty());
+        route.index.fetch_add(1, Ordering::Relaxed);
+
+        let topic = &route.queue[0].topic;
+        assert!(topic.is_some());
+
+        let topic = topic.clone().unwrap();
+        assert_eq!(topic.name, "DefaultCluster");
+        assert_eq!(topic.resource_namespace, "default");
+
+        mock = session::MockRPCClient::new();
+        mock.expect_query_route()
+            .return_once(|_| 
Box::pin(futures::future::ready(new_topic_route_response())));
+
+        let result2 = client.topic_route_inner(mock, "DefaultCluster").await;
+        assert!(result2.is_ok());
+
+        let route2 = result2.unwrap();
+        assert_eq!(1, route2.index.load(Ordering::Relaxed));
+    }
     #[tokio::test]
     async fn client_heartbeat() {
         let response = Ok(HeartbeatResponse {

Reply via email to