Copilot commented on code in PR #369:
URL: https://github.com/apache/fluss-rust/pull/369#discussion_r2845584079


##########
crates/fluss/src/client/connection.rs:
##########
@@ -60,7 +62,27 @@ impl FlussConnection {
     }
 
     pub async fn get_admin(&self) -> Result<FlussAdmin> {
-        FlussAdmin::new(self.network_connects.clone(), 
self.metadata.clone()).await
+        // 1. Fast path: Attempt to acquire a read lock to check if the admin 
already exists.
+        if let Some(admin) = self.admin_client.read().as_ref() {
+            return Ok(admin.as_ref().clone());
+        }
+
+        // 2. Initialize the admin outside the guard lock 
+        let new_admin = 
Arc::new(admin::FlussAdmin::new(self.network_connects.clone(), 
self.metadata.clone()).await?);
+        
+        // 3. Slow path: Acquire the write lock.
+        let mut admin_guard = self.admin_client.write();
+

Review Comment:
   Because `FlussAdmin::new(..).await` is executed before taking the write 
lock, concurrent first-time callers can each perform the full network 
connection setup and spawn RPC tasks, with all but one instance discarded once 
the write lock is acquired. If you want to avoid redundant connections/work 
under concurrency, consider using an async single-init primitive (e.g., a 
`tokio::sync::OnceCell`/async mutex with `get_or_try_init`) so only one 
initialization runs and other callers await it.



##########
crates/fluss/src/client/connection.rs:
##########
@@ -60,7 +62,27 @@ impl FlussConnection {
     }
 
     pub async fn get_admin(&self) -> Result<FlussAdmin> {
-        FlussAdmin::new(self.network_connects.clone(), 
self.metadata.clone()).await
+        // 1. Fast path: Attempt to acquire a read lock to check if the admin 
already exists.
+        if let Some(admin) = self.admin_client.read().as_ref() {
+            return Ok(admin.as_ref().clone());
+        }
+
+        // 2. Initialize the admin outside the guard lock 
+        let new_admin = 
Arc::new(admin::FlussAdmin::new(self.network_connects.clone(), 
self.metadata.clone()).await?);
+        
+        // 3. Slow path: Acquire the write lock.
+        let mut admin_guard = self.admin_client.write();
+
+        // 4. Double-check: Another thread might have initialized the admin
+        // while this thread was waiting for the write lock.
+        if let Some(admin) = admin_guard.as_ref() {
+            return Ok(admin.as_ref().clone());
+        }
+
+        // 5. Store and return the newly created admin.
+        let result = new_admin.as_ref().clone();
+        *admin_guard = Some(new_admin);
+        Ok(result)

Review Comment:
   `get_admin()` now caches a single `FlussAdmin`, but `FlussAdmin` internally 
holds an `admin_gateway: ServerConnection` created once in `FlussAdmin::new()`. 
`RpcClient::get_connection()` has logic to reconnect when a cached connection 
is poisoned, but a cached `FlussAdmin` will keep reusing the same 
`admin_gateway`, so a transient connection failure can become unrecoverable 
(calling `get_admin()` again will return the same broken gateway). Consider 
refactoring `FlussAdmin` to acquire the coordinator connection from `RpcClient` 
per request (or on demand when the current connection is poisoned), or add an 
invalidation/refresh path for the cached admin instance.
   ```suggestion
           // Create a new FlussAdmin on each call so that it can obtain a fresh
           // coordinator connection from RpcClient and recover from transient
           // connection failures.
           admin::FlussAdmin::new(self.network_connects.clone(), 
self.metadata.clone()).await
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to