This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 66e640d  Switch TokenCache to RWLock (#648)
66e640d is described below

commit 66e640d1ba7b79d878e9423924ed5bff4f321b75
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Mar 2 10:05:48 2026 +0000

    Switch TokenCache to RWLock (#648)
    
    * Switch TokenCache to RWLock
    
    * Clippy
---
 src/client/token.rs | 53 ++++++++++++++++++++++++++++++++++-------------------
 1 file changed, 34 insertions(+), 19 deletions(-)

diff --git a/src/client/token.rs b/src/client/token.rs
index 81ffc11..5b680bd 100644
--- a/src/client/token.rs
+++ b/src/client/token.rs
@@ -17,7 +17,7 @@
 
 use std::future::Future;
 use std::time::{Duration, Instant};
-use tokio::sync::Mutex;
+use tokio::sync::RwLock;
 
 /// A temporary authentication token with an associated expiry
 #[derive(Debug, Clone)]
@@ -33,11 +33,17 @@ pub(crate) struct TemporaryToken<T> {
 /// [`TemporaryToken`] based on its expiry
 #[derive(Debug)]
 pub(crate) struct TokenCache<T> {
-    cache: Mutex<Option<(TemporaryToken<T>, Instant)>>,
+    cache: RwLock<Option<CacheEntry<T>>>,
     min_ttl: Duration,
     fetch_backoff: Duration,
 }
 
+#[derive(Debug)]
+struct CacheEntry<T> {
+    token: TemporaryToken<T>,
+    fetched_at: Instant,
+}
+
 impl<T> Default for TokenCache<T> {
     fn default() -> Self {
         Self {
@@ -50,7 +56,7 @@ impl<T> Default for TokenCache<T> {
     }
 }
 
-impl<T: Clone + Send> TokenCache<T> {
+impl<T: Clone + Send + Sync> TokenCache<T> {
     /// Override the minimum remaining TTL for a cached token to be used
     #[cfg(any(feature = "aws", feature = "gcp"))]
     pub(crate) fn with_min_ttl(self, min_ttl: Duration) -> Self {
@@ -63,26 +69,35 @@ impl<T: Clone + Send> TokenCache<T> {
         Fut: Future<Output = Result<TemporaryToken<T>, E>> + Send,
     {
         let now = Instant::now();
-        let mut locked = self.cache.lock().await;
-
-        if let Some((cached, fetched_at)) = locked.as_ref() {
-            match cached.expiry {
-                Some(ttl) => {
-                    if ttl.checked_duration_since(now).unwrap_or_default() > 
self.min_ttl ||
-                        // if we've recently attempted to fetch this token and 
it's not actually
-                        // expired, we'll wait to re-fetch it and return the 
cached one
-                        (fetched_at.elapsed() < self.fetch_backoff && 
ttl.checked_duration_since(now).is_some())
-                    {
-                        return Ok(cached.token.clone());
-                    }
-                }
-                None => return Ok(cached.token.clone()),
-            }
+        let is_token_valid = |entry: &CacheEntry<T>| {
+            entry.token.expiry.is_none_or(|ttl| {
+                ttl.checked_duration_since(now).unwrap_or_default() > 
self.min_ttl ||
+                // if we've recently attempted to fetch this token and it's 
not actually
+                // expired, we'll wait to re-fetch it and return the cached one
+                (entry.fetched_at.elapsed() < self.fetch_backoff && ttl > now)
+            })
+        };
+
+        if let Some(cache) = self.cache.read().await.as_ref()
+            && is_token_valid(cache)
+        {
+            return Ok(cache.token.token.clone());
+        }
+
+        let mut guard = self.cache.write().await;
+        if let Some(cache) = guard.as_ref()
+            && is_token_valid(cache)
+        {
+            // Refresh race
+            return Ok(cache.token.token.clone());
         }
 
         let cached = f().await?;
         let token = cached.token.clone();
-        *locked = Some((cached, Instant::now()));
+        *guard = Some(CacheEntry {
+            token: cached,
+            fetched_at: Instant::now(),
+        });
 
         Ok(token)
     }

Reply via email to