This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 66e640d Switch TokenCache to RWLock (#648)
66e640d is described below
commit 66e640d1ba7b79d878e9423924ed5bff4f321b75
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Mar 2 10:05:48 2026 +0000
Switch TokenCache to RWLock (#648)
* Switch TokenCache to RWLock
* Clippy
---
src/client/token.rs | 53 ++++++++++++++++++++++++++++++++++-------------------
1 file changed, 34 insertions(+), 19 deletions(-)
diff --git a/src/client/token.rs b/src/client/token.rs
index 81ffc11..5b680bd 100644
--- a/src/client/token.rs
+++ b/src/client/token.rs
@@ -17,7 +17,7 @@
use std::future::Future;
use std::time::{Duration, Instant};
-use tokio::sync::Mutex;
+use tokio::sync::RwLock;
/// A temporary authentication token with an associated expiry
#[derive(Debug, Clone)]
@@ -33,11 +33,17 @@ pub(crate) struct TemporaryToken<T> {
/// [`TemporaryToken`] based on its expiry
#[derive(Debug)]
pub(crate) struct TokenCache<T> {
- cache: Mutex<Option<(TemporaryToken<T>, Instant)>>,
+ cache: RwLock<Option<CacheEntry<T>>>,
min_ttl: Duration,
fetch_backoff: Duration,
}
+#[derive(Debug)]
+struct CacheEntry<T> {
+ token: TemporaryToken<T>,
+ fetched_at: Instant,
+}
+
impl<T> Default for TokenCache<T> {
fn default() -> Self {
Self {
@@ -50,7 +56,7 @@ impl<T> Default for TokenCache<T> {
}
}
-impl<T: Clone + Send> TokenCache<T> {
+impl<T: Clone + Send + Sync> TokenCache<T> {
/// Override the minimum remaining TTL for a cached token to be used
#[cfg(any(feature = "aws", feature = "gcp"))]
pub(crate) fn with_min_ttl(self, min_ttl: Duration) -> Self {
@@ -63,26 +69,35 @@ impl<T: Clone + Send> TokenCache<T> {
Fut: Future<Output = Result<TemporaryToken<T>, E>> + Send,
{
let now = Instant::now();
- let mut locked = self.cache.lock().await;
-
- if let Some((cached, fetched_at)) = locked.as_ref() {
- match cached.expiry {
- Some(ttl) => {
- if ttl.checked_duration_since(now).unwrap_or_default() >
self.min_ttl ||
- // if we've recently attempted to fetch this token and
it's not actually
- // expired, we'll wait to re-fetch it and return the
cached one
- (fetched_at.elapsed() < self.fetch_backoff &&
ttl.checked_duration_since(now).is_some())
- {
- return Ok(cached.token.clone());
- }
- }
- None => return Ok(cached.token.clone()),
- }
+ let is_token_valid = |entry: &CacheEntry<T>| {
+ entry.token.expiry.is_none_or(|ttl| {
+ ttl.checked_duration_since(now).unwrap_or_default() >
self.min_ttl ||
+ // if we've recently attempted to fetch this token and it's
not actually
+ // expired, we'll wait to re-fetch it and return the cached one
+ (entry.fetched_at.elapsed() < self.fetch_backoff && ttl > now)
+ })
+ };
+
+ if let Some(cache) = self.cache.read().await.as_ref()
+ && is_token_valid(cache)
+ {
+ return Ok(cache.token.token.clone());
+ }
+
+ let mut guard = self.cache.write().await;
+ if let Some(cache) = guard.as_ref()
+ && is_token_valid(cache)
+ {
+ // Refresh race
+ return Ok(cache.token.token.clone());
}
let cached = f().await?;
let token = cached.token.clone();
- *locked = Some((cached, Instant::now()));
+ *guard = Some(CacheEntry {
+ token: cached,
+ fetched_at: Instant::now(),
+ });
Ok(token)
}